Kafka-Spark Streaming-仅从1个分区读取数据

艾琳

我有一个独立的Spark集群,它正在从kafka队列中读取数据。kafka队列有5个分区,spark仅处理来自其中一个分区的数据。我正在使用以下内容:

这是我的Maven依赖项:

    <dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.0.2</version>
    </dependency>   
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.2</version>
    </dependency>
    <dependency>
        <groupId>kafka-custom</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.1.1</version>
    </dependency>   

我的kafka生产者是一个简单的生产者,仅将100条消息放在队列中:

    public void generateMessages() {

    // Define the properties for the Kafka Connection
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaBrokerServer); // kafka server
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");

    // Create a KafkaProducer using the Kafka Connection properties
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
            props);
    for (int i = 0; i < 100; i++) {
        ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopic, "value-" + i);
        producer.send(record);
    }
    producer.close();

}

这是我的流传输作业中的主要代码:

    public void processKafka() throws InterruptedException {
    LOG.info("************ SparkStreamingKafka.processKafka start");

   // Create the spark application
    SparkConf sparkConf = new SparkConf();
    sparkConf.set("spark.executor.cores", "5");

    //To express any Spark Streaming computation, a StreamingContext object needs to be created. 
    //This object serves as the main entry point for all Spark Streaming functionality.
    //This creates the spark streaming context with a 'numSeconds' second batch size
    jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchInterval));


    //List of parameters
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", this.getBrokerList());
    kafkaParams.put("client.id", "SpliceSpark");
    kafkaParams.put("group.id", "mynewgroup");
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", false);
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);

    List<TopicPartition> topicPartitions= new ArrayList<TopicPartition>();
    for(int i=0; i<5; i++) {
        topicPartitions.add(new TopicPartition("mytopic", i));
    }


    //List of kafka topics to process
    Collection<String> topics = Arrays.asList(this.getTopicList().split(","));


    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
            jssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
          );

    //Another version of an attempt
    /*
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, String>Assign(topicPartitions, kafkaParams)
      );
     */

    messages.foreachRDD(new PrintRDDDetails());


    // Start running the job to receive and transform the data 
    jssc.start();

    //Allows the current thread to wait for the termination of the context by stop() or by an exception
    jssc.awaitTermination();
}

PrintRDDDetails的调用方法具有以下内容:

    public void call(JavaRDD<ConsumerRecord<String, String>> rdd)
        throws Exception {

    LOG.error("--- New RDD with " + rdd.partitions().size()
            + " partitions and " + rdd.count() + " records");

}

似乎发生的事情是它仅从一个分区获取数据。我在kafka中确认有5个分区。当执行调用方法时,它会打印正确数量的分区,但仅打印1个分区中的记录-我从此简化代码中提取的进一步处理表明,它仅处理1个分区。

参数

这似乎是Spark 2.1.0的问题,因为它使用的是kafka-clients库的v0.10.1(根据以下拉取请求):

https://github.com/apache/spark/pull/16278

我通过使用较新版本的kafka-clients库来解决此问题:

libraryDependencies ++= Seq(
  "org.apache.spark"  %%  "spark-core"                  % sparkVersion,
  "org.apache.spark"  %%  "spark-streaming"             % sparkVersion,
  "org.apache.spark"  %%  "spark-sql"                   % sparkVersion,
  "org.apache.spark"  %%  "spark-streaming-kinesis-asl" % sparkVersion,
  "org.apache.spark"  %  "spark-streaming-kafka-0-10_2.11"  % sparkVersion,
).map(_.exclude("org.apache.kafka", "kafka-clients"))

libraryDependencies += "org.apache.kafka"  %   "kafka-clients" % "0.10.2.0"

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Spark Streaming Kafka流

Kafka和TextSocket Stream中的Spark Streaming数据分发

使用Spark Streaming读取Kafka记录时未序列化异常

Spark Streaming-是否可以使用Kafka Topic的特定分区?

Spark Streaming Kafka消费者

如何从Spark Streaming开始读取Kafka主题的记录?

Spark Streaming + Kafka-Spark会话API

使用Kafka 0.11的Spark Streaming 1.6

使用Spark Streaming时如何通过自己保存多个分区的Kafka偏移量

如何使用Spark Streaming从Kafka读取二进制序列化的Avro(Confluent平台)

Spark Streaming Kafka偏移量管理

Kafka + Spark Streaming-分区之间的公平性吗?

Spark Streaming Kafka parallelModificationException

使用Spark Streaming从Kafka读取流并为其分配模式

在Spark Streaming /结构化流媒体中读取来自Kafka的Avro消息

如果仅指定了1个kafka代理,Spark Streaming是否可以自动发现新的kafka代理?

从Spark Streaming读取Kafka SSL客户端信任库文件时出错

Spark Structed Streaming从kafka读取嵌套的json并将其展平

在Spark Structured Streaming中从Kafka消息中读取换行符分隔的json

在awaitResult中抛出Kafka Spark Streaming异常

Apache Spark Streaming kafka 集成错误 JAVA

Spark 2.2.0 Streaming 找不到数据源:kafka

Spark kafka Streaming 拉取更多消息

Spark Streaming + Kafka 集成 0.8.2.1

Spark 问题 - 无法使用 Kafka Streaming 处理数据

多个主题的Kafka Spark Streaming摄取

Spark Structured Streaming foreach Sink 自定义编写器无法从 Kafka 主题读取数据

Spark Structured Streaming 从具有多个读取流的多个 Kafka 主题中读取

Spark Structured Streaming 读取嵌套的 Kafka Connect jsonConverter 消息