我正在运行一个Spring Boot应用程序,并且已经扣好了 compile('org.springframework.kafka:spring-kafka:2.1.5.RELEASE')
我试图与此版本的Cloudera安装相反:
Cloudera Distribution of Apache Kafka Version 3.0.0-1.3.0.0.p0.40 Version 0.11.0+kafka3.0.0+50
我的KafkaProducerConfig类非常简单:
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.template.default-topic}")
private String defaultTopicName;
@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType;
@Value("${spring.kafka.producer.client-id}")
private String producerClientId;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.producerCompressionType);
props.put(ProducerConfig.CLIENT_ID_CONFIG, this.producerClientId);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
@Bean
public ProducerFactory<String, Pdid> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Pdid> kafkaTemplate() {
KafkaTemplate<String, Pdid> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(this.defaultTopicName);
return kafkaTemplate;
}
@PostConstruct
public void postConstruct() {
LOGGER.info("Kafka producer configuration: " + this.producerConfigs().toString());
LOGGER.info("Kafka topic name: " + this.defaultTopicName);
}
}
启动应用程序时,我收到:
2018-05-01 17:15:41.355 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1
2018-05-01 17:15:41.356 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
然后,我发送有效载荷。它针对该主题显示在Kafka工具中。但是,在尝试提取数据时,在Kafka端的日志中,我收到:
[KafkaApi-131] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=profiles-pdid,partitions=[{partition=0,fetch_offset=7,max_bytes=1048576}]}]}java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)
我已经从Producer应用程序方面尝试了以下操作:
我希望我缺少明显的东西。我是否需要打开/关闭其他标题以帮助解决任何人都知道的Magic v0问题?
我们还有其他应用程序可以在同一环境中成功写入其他主题,但是它们是手工制作必要的Spring Bean的较旧的应用程序。此外,这些应用程序还使用了较旧的客户端(0.8.2.2),并且使用StringSerializer作为Producer值而不是JSON。我需要我的数据使用JSON,并且当我们使用的系统应该支持0.11.x时,我真的不想降级到0.8.2.2。
但是它们是手工制作必要的Spring bean的较旧的应用程序。
在org.apache.kafka.common.record.FileRecords中。downConvert(FileRecords.java:245)
我不熟悉kafka经纪人的内部知识,但听起来像主题是由旧的经纪人创建的,它们的格式不支持标头,而不是经纪人版本本身(提示:downConvert)。
您是否曾与干净的经纪人尝试过?
只要您不尝试使用经纪人不支持的功能,1.0.x客户端就可以与较早的经纪人进行对话(返回到0.10.2.x IIRC)。您的经纪人为0.11(确实支持标头)的事实进一步表明,这是问题的主题记录格式。
只要您使用通用功能子集,我就可以成功测试上/下代理/客户端/主题版本,而不会出现问题。
JsonSerializer.ADD_TYPE_INFO_HEADERS为false。
那应该防止框架设置任何标题;您需要显示您的生产者代码(以及所有配置)。
您还可以将A添加ProducerInterceptor
到生产者配置中,并检查方法中的ProducerRecord
headers
属性onSend()
,以弄清输出消息中的设置标头。
如果您正在使用spring-messaging消息(template.setn(Message<?> m)
,则默认情况下将映射标头)。使用原始template.send()
方法不会设置任何标头(除非您发送的ProducerRecord
标头已填充)。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句