我正在尝试对Kafka使用Spark的直接方法(无接收器),我有以下Kafka配置图:
configMap.put("zookeeper.connect","192.168.51.98:2181");
configMap.put("group.id", UUID.randomUUID().toString());
configMap.put("auto.offset.reset","smallest");
configMap.put("auto.commit.enable","true");
configMap.put("topics","IPDR31");
configMap.put("kafka.consumer.id","kafkasparkuser");
configMap.put("bootstrap.servers","192.168.50.124:9092");
现在我的目标是,如果我的Spark管道崩溃并再次启动,则该流应从使用者组提交的最新偏移量开始。因此,为此,我想为消费者指定起始偏移量。我有有关每个分区中提交的偏移量的信息。我如何将这些信息提供给流功能。目前我正在使用
JavaPairInputDStream<byte[], byte[]> kafkaData =
KafkaUtils.createDirectStream(js, byte[].class, byte[].class,
DefaultDecoder.class, DefaultDecoder.class,configMap,topic);
查看Spark API文档中的createDirectStream的第二种形式-它允许您传递Map<TopicAndPartition, Long>
,其中Long是偏移量。
请注意,使用DirectInputStream时,Spark不会自动更新Zookeeper中的偏移量-您必须将它们自己写入ZK或其他数据库。除非严格要求一次语义,否则使用createStream方法获取DStream会更容易,在这种情况下,Spark将更新ZK中的偏移量,并在失败的情况下从最后存储的偏移量恢复。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句