Spring Kafka-遇到“ Magic v0不支持记录头”错误

MD10172016

我正在运行一个Spring Boot应用程序,并且已经扣好了 compile('org.springframework.kafka:spring-kafka:2.1.5.RELEASE')

我试图与此版本的Cloudera安装相反:

Cloudera Distribution of Apache Kafka Version 3.0.0-1.3.0.0.p0.40 Version 0.11.0+kafka3.0.0+50

我的KafkaProducerConfig类非常简单:

@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.template.default-topic}")
private String defaultTopicName;

@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType;

@Value("${spring.kafka.producer.client-id}")
private String producerClientId;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.producerCompressionType);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, this.producerClientId);
    props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

    return props;
}

@Bean
public ProducerFactory<String, Pdid> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Pdid> kafkaTemplate() {
    KafkaTemplate<String, Pdid> kafkaTemplate = new KafkaTemplate<>(producerFactory());

    kafkaTemplate.setDefaultTopic(this.defaultTopicName);

    return kafkaTemplate;
}

@PostConstruct
public void postConstruct() {
    LOGGER.info("Kafka producer configuration: " + this.producerConfigs().toString());
    LOGGER.info("Kafka topic name: " + this.defaultTopicName);
}

}

启动应用程序时,我收到:

2018-05-01 17:15:41.355 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 2018-05-01 17:15:41.356 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e

然后,我发送有效载荷。它针对该主题显示在Kafka工具中。但是,在尝试提取数据时,在Kafka端的日志中,我收到:

[KafkaApi-131] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=profiles-pdid,partitions=[{partition=0,fetch_offset=7,max_bytes=1048576}]}]}java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)

我已经从Producer应用程序方面尝试了以下操作:

  1. 降级到Spring Kafka 2.0.4。我希望降至0.11.0的Kafka版本将有助于解决该问题,但没有效果。
  2. 验证节点都是相同的版本。据我的管理员说,他们是。
  3. 经管理员确认,我们没有混合安装。再次,我被告知我们不这样做。
  4. 基于类似的堆栈溢出问题,我回到了2.1.5,并尝试将JsonSerializer.ADD_TYPE_INFO_HEADERS设置为false。我认为也许它将删除日志所指的标头。同样,没有执行,并且记录了相同的错误。

我希望我缺少明显的东西。我是否需要打开/关闭其他标题以帮助解决任何人都知道的Magic v0问题?

我们还有其他应用程序可以在同一环境中成功写入其他主题,但是它们是手工制作必要的Spring Bean的较旧的应用程序。此外,这些应用程序还使用了较旧的客户端(0.8.2.2),并且使用StringSerializer作为Producer值而不是JSON。我需要我的数据使用JSON,并且当我们使用的系统应该支持0.11.x时,我真的不想降级到0.8.2.2。

加里·罗素

但是它们是手工制作必要的Spring bean的较旧的应用程序。

在org.apache.kafka.common.record.FileRecords中。downConvert(FileRecords.java:245)

我不熟悉kafka经纪人的内部知识,但听起来像主题是由旧的经纪人创建的,它们的格式不支持标头,而不是经纪人版本本身(提示:downConvert)。

您是否曾与干净的经纪人尝试过?

只要您不尝试使用经纪人不支持的功能,1.0.x客户端就可以与较早的经纪人进行对话(返回到0.10.2.x IIRC)。您的经纪人为0.11(确实支持标头)的事实进一步表明,这是问题的主题记录格式。

只要您使用通用功能子集,我就可以成功测试上/下代理/客户端/主题版本,而不会出现问题。

JsonSerializer.ADD_TYPE_INFO_HEADERS为false。

那应该防止框架设置任何标题;您需要显示您的生产者代码(以及所有配置)。

您还可以将A添加ProducerInterceptor到生产者配置中,并检查方法中ProducerRecord headers属性onSend(),以弄清输出消息中的设置标头。

如果您正在使用spring-messaging消息(template.setn(Message<?> m),则默认情况下将映射标头)。使用原始template.send()方法不会设置任何标头(除非您发送的ProducerRecord标头已填充)。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

为什么我在Spring Boot应用程序中遇到未找到错误的错误

使用DeadLetterPublishingRecoverer处理Spring-kafka错误

使用JPA的Spring Boot-列中遇到错误的列类型

Spring Boot JPA / Hibernate错误的列类型遇到(JSON字段)

Spring Kafka @SendTo引发异常:需要KafkaTemplate来支持回复

Spring Kafka Producer不发送到Kafka 1.0.0(Magic v1不支持记录头)

使用Spring Kafka添加自定义标头

无法在Spring中配置CSRF以免遇到CORS错误

Spring Cloud Stream Kafka是否支持嵌入式标头?

Kafka Spring集成:标头不适用于kafka消费者

使用mvn spring-boot遇到错误:运行“子容器无法启动”

Spring Kafka错误处理-v1.1.x

在Spring Boot中使用Jersey进行文件上传时遇到400错误请求

静态解析符号值时遇到错误。不支持函数调用。考虑更换函数或lambda吗?

Spring Cloud Stream嵌入式标头格式(Kafka)

从RabbitMQ切换到Kafka遇到的问题

Animated.spring:RCTJSONStringify()遇到以下错误:JSON写入中的无效数字值(NaN)

Spring Kafka会延迟使用记录

Spring Integration和Kafka:如何根据消息头过滤消息

spring-integration-kafka注释支持和示例

在Spring应用程序中遇到错误:cvc-elt.1:找不到元素'beans'的声明

Datarest Spring遇到麻烦

尝试运行简单的Spark Streaming Kafka示例时遇到错误

在 Spring Tool Suite 中导入 gradle 项目时遇到错误“无法启动守护进程”

遇到不支持的 Oracle 数据类型 101

Spring Cloud Stream Kafka 错误通道

添加Spring Kafka Maven依赖版本2.2.1.RELEASE遇到ClassNotFoundException

如何解决我在通过邮递员测试 spring boot rest api 时遇到的错误?

如何删除 Kafka(Spring-boot)中的标头?