博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark 0.8版本kafka 保存offset ,实现0数据丢失
阅读量:6924 次
发布时间:2019-06-27

本文共 2904 字,大约阅读时间需要 9 分钟。

最近的项目还是用的老的kafka版本(0.8),用spark 接数据的时候,如果spark 程序意外重启,重启时间内的kafka数据会丢失。我们需要实现最少消费一次,数据重复没有关系。但不能允许丢失数据。

在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式分别是 1)基于 Receiver-based 的 createStream 方法和 2)Direct Approach (No Receivers) 方式的 createDirectStream 方法, 在生产上我们首先用第一种方式,发现性能有很多损耗,而且也不稳定,所以我们后来使用的是第2种方式。我们把kafka 的offset 保存在zookeeper中,实际测试发现zookeeper保存offset效率还不错,下面是具体代码修改记录, 供参考:

 

val topics = Set[String](kafkaInputTopic) val kafkaParams = Map[String, String](   "metadata.broker.list" -> destBrokerHost,   "serializer.class" -> "kafka.serializer.StringEncoder",   "group.id" -> kafkaGroup) var offsetRanges = Array.empty[OffsetRange] var kafkaStream: InputDStream[(String,  Array[Byte])] = null val zkClient = new ZkClient(zookeeper) val topicDirs = new ZKGroupTopicDirs("spark_streaming", kafkaInputTopic)  //创建一个 ZKGroupTopicDirs 对象 val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")     //查询该路径下是否字节点 var fromOffsets: Map[TopicAndPartition, Long] = Map()  if (children > 0) {//如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
for (i <- 0 until children) {
val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}") val tp = TopicAndPartition(kafkaInputTopic, i) fromOffsets += (tp -> partitionOffset.toLong) //将不同 partition 对应的 offset 增加到 fromOffsets 中 } val messageHandler = (mmd: MessageAndMetadata[String, Array[Byte]]) => (mmd.topic, mmd.message()) kafkaStream =KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder,(String,Array[Byte])](ssc, kafkaParams, fromOffsets,messageHandler) }else{
kafkaStream=KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics) }
//这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
val lines: DStream[Array[Byte]] = kafkaStream   .transform { rdd =>     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges     rdd   }.map(line => line._2) 。。。。
.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
var zkClient:ZkClient=null try {
zkClient = new ZkClient(zookeeper) for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString) //将该 partition 的 offset 保存到 zookeeper println(s"${o.topic} ${o.partition} success set to zookeeper.") } } catch {
case e: Exception => {
e.printStackTrace() } } finally {
if (zkClient != null) {
zkClient.close() } } }}
pom.xml 增加zookeeper依赖 
org.apache.zookeeper
zookeeper
3.4.9
com.101tec
zkclient
0.10

转载于:https://www.cnblogs.com/zuoql/p/10491673.html

你可能感兴趣的文章
inotify用法简介及结合rsync实现主机间的文件实时同步
查看>>
chrome和搜狗浏览器的js问题
查看>>
摄影视觉运用于网页设计
查看>>
[UI] 精美UI界面欣赏[9]
查看>>
Tasker to detect and vibrate once the ougoing call is being answered
查看>>
#define中 #与##的神奇用法linux学习 (转)
查看>>
博客园博客撰写工具【开源】(可以直接黏贴图片)
查看>>
onkeyup 事件会在键盘按键被松开时发生
查看>>
移动测试会Ebay沙龙PPT
查看>>
简约之美Jodd-http--深入源码理解http协议
查看>>
Fat-tree 胖树交换网络
查看>>
楼塔当天领袖acm心理(作为励志使用)
查看>>
Java知多少(98)Graphics类的绘图方法
查看>>
SQL Server 有关EXCEPT和INTERSECT使用
查看>>
unix域套接字UDP网络编程
查看>>
.NET破解之谷歌地图下载助手-睿智版
查看>>
在Hekaton里,正确选择哈希存储桶数
查看>>
[火狐REST] 火狐REST 模拟 HTTP get, post请求
查看>>
浅谈HTML5单页面架构(二)——backbone + requirejs + zepto + underscore
查看>>
从头学起android&lt;AutoCompleteTextView文章提示文本框.十九.&gt;
查看>>