我们如何在Kafka中快速编写单个消息(非批处理)?

苏拉杰·辛格:

我是Golang和Kafka的新手,并且我正在使用segmentio kafka-go使用Golang连接到Kafka服务器。到目前为止,我想推送Kafka中用户的每个事件,所以我想推送单个消息(而不是批量),但是由于此库提供的写操作对批量消息或单个消息花费相同的时间,因此很多时间。有什么办法可以快速编写单个消息,以便我可以在更短的时间内推送卡夫卡中的数百万个事件?

我已经针对单个消息和批处理消息进行了测试,这花费了相同的时间(最小为10毫秒)。

奥迪隆:

我认为您的问题只是WriterConfig。

例如,如果您的配置类似于segmentio / kafka-go docs上的示例:

w := kafka.NewWriter(kafka.WriterConfig{
    Brokers:      []string{"localhost:9092"},
    Topic:        "topic-A",
    Balancer:     &kafka.LeastBytes{},
})

您可以尝试设置批次大小和批次超时:

w := kafka.NewWriter(kafka.WriterConfig{
    Brokers:      []string{"localhost:9092"},
    Topic:        "topic-A",
    Balancer:     &kafka.LeastBytes{},
    BatchSize:    1,
    BatchTimeout: 10 * time.Millisecond,
})

发生这种情况是因为kafka-go默认情况下等待1秒钟,直到批处理达到最大大小为止(默认情况下为100条消息),正如我们在代码中看到的那样

希望对您有帮助。


更新:请注意,一一发送消息会减慢该过程。例如:批量发送100条消息在我的计算机上花费了0.0107s。一次发送相同的100条消息花费了0.0244s。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

我们如何在IntelliJ IDEA中编写我们自己的代码模板文件

如何在融合的Kafka Python中读取批处理消息?

我们如何在iTextSharp中编写Unicode印地语文本?

我们如何在React JS中为无状态功能组件编写点击处理程序

我们如何在域模型中处理删除的不同含义

在此示例中,我们如何编写高阶函数?

我们如何在按下颤振的单个按钮中执行两个功能

如果我们在单个Jinja模板中有多个宏。如何在python中渲染特定宏

我们如何在用React Native编写的移动应用程序中获取GPS位置

我们如何在Azure DevOps构建管道中读取GIT提交消息?

我们如何在迁移学习中为我们的培训设置标签?

我们如何处理vuejs中的响应?

我们如何在密码框中显示表单验证消息?

我们如何在vb.net中处理大量

我们如何在消息中抛出异常?

我们如何在Spring中编写Mongo DB查询

批处理脚本:我们可以跳过(%*)中的参数吗?

我们如何在MongoDB聚合查询中求和单个数组元素?

在春季批处理远程分区中,我们可以将主配置和从配置配置为单个配置,然后将其作为单个程序运行

我们如何在单个 HTML 表单中执行不同的操作以在 django 中动态更改 url?

我们如何在php中处理动态表单

我们如何在 Kotlin Exposed 中编写查询?

我们如何快速使用 NSSelectorFromString ?

我们如何在单个谷歌驱动器应用程序中运行多个功能?

在Spring批处理xml编写中,我们可以为特定的记录数编写复杂的xml吗

我们如何在 AWS Cloudformation 中编写 If (Condtion) do (X and Y) else (do A and B)

我们如何在单个 XQuery 代码中获取所有 MarkLogic 数据库文档计数?

在flutter中我们如何在更改图标颜色或背景颜色的同时编写if()else if()else()?

如何在 mule 中动态设置 kafka 偏移值,以便我们可以从该特定偏移量开始处理