博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
sparkstreaming ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
阅读量:2179 次
发布时间:2019-05-01

本文共 1658 字,大约阅读时间需要 5 分钟。

最近在测试sparkstreaming的时候发现了一个问题,记录一下

环境 spark 2.x, kafka_0.10.x
示例代码:

val ssc: StreamingContext = new StreamingContext(sparkSession.sparkContext,Seconds(5))    val kafkaBrokers:String = "hadoop01:9092,hadoop02:9092,hadoop03:9092"    val kafkaTopics: String = "test"    val kafkaParam = Map(      "bootstrap.servers" -> kafkaBrokers,//用于初始化链接到集群的地址      "key.deserializer" -> classOf[StringDeserializer],      "value.deserializer" -> classOf[StringDeserializer],      "group.id" -> "group1",      "auto.offset.reset" -> "earliest",      "enable.auto.commit" -> (false: java.lang.Boolean)    )    val inputDStream=KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,      ConsumerStrategies.Subscribe[String,String](Array(kafkaTopics),kafkaParam))    val valueDStream: DStream[String] = inputDStream.map(_.value())    valueDStream.foreachRDD(rdd =>{      val tRDD: RDD[String] = rdd.filter(_.contains("t"))      val hRDD: RDD[String] = rdd.filter(_.contains("h"))      tRDD.union(hRDD).foreach(println)    })

报错信息.java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

kafka问题
原因分析:
这里的两个rdd读取的是同一份数据,当执行action时,都会触发两次数据的读操作,(rdd中的一个分区对应着topic中的一个分区,也就是说kafka中的一个分区的数据这里被读取了2次) 但是,同一个分区的数据只能被一个consumer消费,所以这里报错。
解决方法:一个可行的解决方案是对rdd进行缓存或者checkpoint,然后要能保证,原始的kafka中的数据,只会被消费一次,然后剩下的数据消费都从缓存中获取数据。
示例代码:

val valueDStream: DStream[String] = inputDStream.map(_.value()).persist(StorageLevel.DISK_ONLY)    valueDStream.foreachRDD(rdd =>{      val tRDD: RDD[String] = rdd.filter(_.contains("t"))      val hRDD: RDD[String] = rdd.filter(_.contains("h"))      tRDD.union(hRDD).foreach(println)    })

参考:

转载地址:http://rzjkb.baihongyu.com/

你可能感兴趣的文章
【Pyton】【小甲鱼】类和对象
查看>>
压力测试工具JMeter入门教程
查看>>
作为一名软件测试工程师,需要具备哪些能力
查看>>
【Pyton】【小甲鱼】类和对象:一些相关的BIF(内置函数)
查看>>
【Pyton】【小甲鱼】魔法方法
查看>>
单元测试需要具备的技能和4大阶段的学习
查看>>
【Loadrunner】【浙江移动项目手写代码】代码备份
查看>>
Python几种并发实现方案的性能比较
查看>>
[Jmeter]jmeter之脚本录制与回放,优化(windows下的jmeter)
查看>>
Jmeter之正则
查看>>
【JMeter】1.9上考试jmeter测试调试
查看>>
【虫师】【selenium】参数化
查看>>
【Python练习】文件引用用户名密码登录系统
查看>>
学习网站汇总
查看>>
【Python】用Python打开csv和xml文件
查看>>
【Loadrunner】性能测试报告实战
查看>>
【自动化测试】自动化测试需要了解的的一些事情。
查看>>
【selenium】selenium ide的安装过程
查看>>
【手机自动化测试】monkey测试
查看>>
【英语】软件开发常用英语词汇
查看>>