最近的项目还是用的老的kafka版本(0.8),用spark 接数据的时候,如果spark 程序意外重启,重启时间内的kafka数据会丢失。我们需要实现最少消费一次,数据重复没有关系。但不能允许丢失数据。
在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式分别是 1)基于 Receiver-based 的 createStream 方法和 2)Direct Approach (No Receivers) 方式的 createDirectStream 方法, 在生产上我们首先用第一种方式,发现性能有很多损耗,而且也不稳定,所以我们后来使用的是第2种方式。我们把kafka 的offset 保存在zookeeper中,实际测试发现zookeeper保存offset效率还不错,下面是具体代码修改记录, 供参考:
val topics = Set[String](kafkaInputTopic) val kafkaParams = Map[String, String]( "metadata.broker.list" -> destBrokerHost, "serializer.class" -> "kafka.serializer.StringEncoder", "group.id" -> kafkaGroup) var offsetRanges = Array.empty[OffsetRange] var kafkaStream: InputDStream[(String, Array[Byte])] = null val zkClient = new ZkClient(zookeeper) val topicDirs = new ZKGroupTopicDirs("spark_streaming", kafkaInputTopic) //创建一个 ZKGroupTopicDirs 对象 val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}") //查询该路径下是否字节点 var fromOffsets: Map[TopicAndPartition, Long] = Map() if (children > 0) {//如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
for (i <- 0 until children) { val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}") val tp = TopicAndPartition(kafkaInputTopic, i) fromOffsets += (tp -> partitionOffset.toLong) //将不同 partition 对应的 offset 增加到 fromOffsets 中 } val messageHandler = (mmd: MessageAndMetadata[String, Array[Byte]]) => (mmd.topic, mmd.message()) kafkaStream =KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder,(String,Array[Byte])](ssc, kafkaParams, fromOffsets,messageHandler) }else{ kafkaStream=KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics) }
//这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
val lines: DStream[Array[Byte]] = kafkaStream .transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.map(line => line._2) 。。。。
.foreachRDD(rdd => { if (!rdd.isEmpty()) { var zkClient:ZkClient=null try { zkClient = new ZkClient(zookeeper) for (o <- offsetRanges) { println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString) //将该 partition 的 offset 保存到 zookeeper println(s"${o.topic} ${o.partition} success set to zookeeper.") } } catch { case e: Exception => { e.printStackTrace() } } finally { if (zkClient != null) { zkClient.close() } } }}
pom.xml 增加zookeeper依赖org.apache.zookeeper zookeeper 3.4.9 com.101tec zkclient 0.10