本文共 1658 字,大约阅读时间需要 5 分钟。
最近在测试sparkstreaming的时候发现了一个问题,记录一下
环境 spark 2.x, kafka_0.10.x 示例代码:val ssc: StreamingContext = new StreamingContext(sparkSession.sparkContext,Seconds(5)) val kafkaBrokers:String = "hadoop01:9092,hadoop02:9092,hadoop03:9092" val kafkaTopics: String = "test" val kafkaParam = Map( "bootstrap.servers" -> kafkaBrokers,//用于初始化链接到集群的地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "group1", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val inputDStream=KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](Array(kafkaTopics),kafkaParam)) val valueDStream: DStream[String] = inputDStream.map(_.value()) valueDStream.foreachRDD(rdd =>{ val tRDD: RDD[String] = rdd.filter(_.contains("t")) val hRDD: RDD[String] = rdd.filter(_.contains("h")) tRDD.union(hRDD).foreach(println) })
报错信息.java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
val valueDStream: DStream[String] = inputDStream.map(_.value()).persist(StorageLevel.DISK_ONLY) valueDStream.foreachRDD(rdd =>{ val tRDD: RDD[String] = rdd.filter(_.contains("t")) val hRDD: RDD[String] = rdd.filter(_.contains("h")) tRDD.union(hRDD).foreach(println) })
参考:
转载地址:http://rzjkb.baihongyu.com/