为什么我们需要使用线程来运行Kafka使用者?我们需要多少个线程?

alt-f4:

我是Java的新手(我在Scala上有一些经验),目前正在尝试了解Kafka。我在本教程中遇到了以下示例(主要是为了参考而添加代码):

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class ConsumerDemoWithThread {

    public static void main(String[] args) {
        new ConsumerDemoWithThread().run();
    }

    private ConsumerDemoWithThread() {

    }

    private void run() {
        Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());

        String bootstrapServers = "127.0.0.1:9092";
        String groupId = "my-sixth-application";
        String topic = "first_topic";

        // latch for dealing with multiple threads
        CountDownLatch latch = new CountDownLatch(1);

        // create the consumer runnable
        logger.info("Creating the consumer thread");
        Runnable myConsumerRunnable = new ConsumerRunnable(
                bootstrapServers,
                groupId,
                topic,
                latch
        );

        // start the thread
        Thread myThread = new Thread(myConsumerRunnable);
        myThread.start();

        // add a shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("Caught shutdown hook");
            ((ConsumerRunnable) myConsumerRunnable).shutdown();
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info("Application has exited");
        }

        ));

        try {
            latch.await();
        } catch (InterruptedException e) {
            logger.error("Application got interrupted", e);
        } finally {
            logger.info("Application is closing");
        }
    }

    public class ConsumerRunnable implements Runnable {

        private final CountDownLatch latch;
        private final KafkaConsumer<String, String> consumer;
        private final Logger logger = LoggerFactory.getLogger(ConsumerRunnable.class.getName());

        public ConsumerRunnable(String bootstrapServers,
                                String groupId,
                                String topic,
                                CountDownLatch latch) {
            this.latch = latch;

            // create consumer configs
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            // create consumer
            consumer = new KafkaConsumer<String, String>(properties);
            // subscribe consumer to our topic(s)
            consumer.subscribe(Collections.singletonList(topic));
        }

        @Override
        public void run() {
            // poll for new data
            try {
                while (true) {
                    ConsumerRecords<String, String> records =
                            consumer.poll(Duration.ofMillis(100)); // new in Kafka 2.0.0

                    for (ConsumerRecord<String, String> record : records) {
                        logger.info("Key: " + record.key() + ", Value: " + record.value());
                        logger.info("Partition: " + record.partition() + ", Offset:" + record.offset());
                    }
                }
            } catch (WakeupException e) {
                logger.info("Received shutdown signal!");
            } finally {
                consumer.close();
                // tell our main code we're done with the consumer
                latch.countDown();
            }
        }

        public void shutdown() {
            // the wakeup() method is a special method to interrupt consumer.poll()
            // it will throw the exception WakeUpException
            consumer.wakeup();
        }
    }
}

我主要是想了解:

  1. 使用线程运行使用者的好处是什么?我(我以为Kafka还是抽象了消费者之间的负载分配
  2. 当我们使用时,Thread myThread = new Thread(myConsumerRunnable);它是在单个线程中运行还是在多个线程中运行?
  3. 为什么我们要通过单独的线程触发关闭挂钩?从检查该方法的理解来看,它似乎更像是Java而不是Kafka
埃里希·基茨穆勒(Erich Kitzmueller):

使用线程运行使用者的好处是什么?我(我以为Kafka无论如何都可以抽象化消费者之间的负载分配)

如您所见,使用者在run方法中开始无限循环作为新线程启动它,可以在使用者已经处于活动状态时在主线程中执行更多操作。

当我们使用线程时,myThread = new Thread(myConsumerRunnable); 是在>单线程还是跨多个线程运行?

创建Thread对象尚未启动新线程。这是myThread.start();新线程开始执行的地方。您的示例程序有一个主线程和一个使用者线程。主线程实际上正在等待通过发出的关闭信号CountDownLatch latch,因此可以避免使用用户线程。

为什么我们要通过单独的线程触发关闭挂钩?(从检查该方法的理解来看,它似乎更像是Java而不是Kafka)

这是java的东西。直到发生关闭,才会真正执行关闭挂钩线程。参见https://docs.oracle.com/javase/7/docs/api/java/lang/Runtime.html#addShutdownHook(java.lang.Thread)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

为什么我们需要使用flatMap?

为什么我们需要使用基数?

为什么我们需要mktemp?

为什么我们需要使用awaitTermination而不是shutdownNow()?

为什么我们需要一个Runnable来启动线程?

为什么我们需要使用Context.obtainStyledAttributes和define-styleable来获取GUI组件的属性

为什么我们需要使用Collection.sort()方法对List进行排序?

我们到底需要使用类型转换吗?

为什么我们需要需求?

我们为什么需要`ngDoCheck`

为什么我们需要使用导入“ babel-polyfill”;在反应成分?

我们是否需要使用后台线程使用Firebase检索数据?

为什么我们需要使用Catch分别编译主测试文件?

为什么在使用__syncthreads时我们不需要使用volatile变量

我们需要多少个内存屏障来实现彼得森锁?

为什么我们需要使用多个版本的JQuery库

为什么我们需要使用grunt-useminPrepare

我们什么时候需要使用-e来进行sed?

为什么我们需要使用hint.start()?

为什么我们需要在另一个队列上使用dispatch_sync()而不是在iOS GCD中使用当前队列/线程

为什么我们需要安装 http 模块来运行我们的 node js 应用程序?

我们如何让 API 使用者知道在 RESTful 中创建资源需要哪些数据?

在 C 中使用指针时,我们何时以及为什么需要使用 malloc?

为什么我们需要在应用加载时使用 BeginInvokeOnMainThread 来显示 DisplayAlert

为什么我们需要使用标记接口?

为什么我们使用损失来更新我们的模型,但使用指标来选择我们需要的模型?

如果我们可以更改线程池使用的线程数,为什么还需要节点集群?

为什么我们需要使用 .net 扩展名保存 pytorch 模型?

我们需要使用什么格式来存储 db.Time 中的数据?