如何使用Spark的Kafka Direct Stream设置消费者组提交的偏移量?

abi_pat

我正在尝试对Kafka使用Spark的直接方法(无接收器),我有以下Kafka配置图:

configMap.put("zookeeper.connect","192.168.51.98:2181");
configMap.put("group.id", UUID.randomUUID().toString());
configMap.put("auto.offset.reset","smallest");
configMap.put("auto.commit.enable","true");
configMap.put("topics","IPDR31");
configMap.put("kafka.consumer.id","kafkasparkuser");
configMap.put("bootstrap.servers","192.168.50.124:9092");

现在我的目标是,如果我的Spark管道崩溃并再次启动,则该流应从使用者组提交的最新偏移量开始。因此,为此,我想为消费者指定起始偏移量。我有有关每个分区中提交的偏移量的信息。我如何将这些信息提供给流功能。目前我正在使用

JavaPairInputDStream<byte[], byte[]> kafkaData =
   KafkaUtils.createDirectStream(js, byte[].class, byte[].class,
     DefaultDecoder.class, DefaultDecoder.class,configMap,topic); 
拉梅什森

查看Spark API文档中的createDirectStream的第二种形式-它允许您传递Map<TopicAndPartition, Long>,其中Long是偏移量。

请注意,使用DirectInputStream时,Spark不会自动更新Zookeeper中的偏移量-您必须将它们自己写入ZK或其他数据库。除非严格要求一次语义,否则使用createStream方法获取DStream会更容易,在这种情况下,Spark将更新ZK中的偏移量,并在失败的情况下从最后存储的偏移量恢复。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何在Golang Kafka 10中获取分区的消费者组偏移量

spring kafka偏移量增量,即使自动提交偏移量设置为false

如何使用直接流在Kafka Spark流中指定消费者组

Kafka-如何使用高级使用者在每条消息后提交偏移量?

Kafka-创建具有特定偏移量的消费者组?

偏移量存储为Kafka时如何检查消费者偏移量?

如何在Kafka Direct Stream中使用Spark结构化流?

Flink-查询Kafka主题以了解消费者组的偏移量?

消费者组的kafka 0.11重置偏移量--to-datetime

如何在稳定的卡夫卡消费者群中转移话题的偏移量?

在Kafka-python中的使用者组中重置kafka LAG(更改偏移量)

Zookeeper / Kafka如何为消费者保留偏移量?

是否可以在kafka连接器中为kafka消费者组的主题重置偏移量?

卡夫卡中的Spark 1.6 Streaming消费者阅读偏移量停留在createDirectStream上

Kafka Stream:消费者提交频率

如何为消费者设置卡夫卡偏移量?

如何在IntelliJ IDEA中使用Kafka Direct Stream运行Spark Streaming应用程序?

Kafka:消费者api:无法使用kafka-consumer-api从偏移量手动读取和确认

如何设置Wifi Direct

无法使用 zookeeper CLI 获取 kafka 消费者组的偏移信息

手动设置 kafka 组 id 的偏移量

为风暴消费者检查 Kafka 主题的偏移量

Kafka:如何修复偏移量损坏的问题,以及如何手动重置消费者的偏移量以再次读取?

在 Kafka 中,消费者在哪个 __consumer_offsets 分区上提交偏移量?

Kafka消费者组,创建消费者组时设置offset为0

消费者何时提交偏移量?

如何让消费者组 A 拿起消费者组 B 的偏移量但不影响 A 的已提交偏移量?

如何删除一个特定主题的组的消费者偏移量

confluent-kafka-python 库:每个消费者组每个主题的读取偏移量