我有一个带有两个注册处理程序的 Kafka 侦听器,每个处理程序都侦听同一主题但具有不同模式类型的消息。侦听器使用带有@SendTo
注释的转发结果到另一个主题,并且 EOS 由 启用@Transactional
。
@KafkaListener(
groupId = "groupId",
clientIdPrefix = "kafka-async-api-commands-listener",
topics = "cmd-topic",
containerFactory = "kafkaAsyncApiCommandsListenerContainerFactory",
errorHandler = "kafkaAsyncApiErrorHandler"
)
public class KafkaAsyncApiCommandsListener {
@KafkaHandler
@Transactional
@SendTo("resp-topic")
Message<FooResponse> Foo(FooCommand command) {
FooResponse response = new FooResponse(command);
return MessageBuilder
.withPayload(response)
.build();
}
@KafkaHandler
@Transactional
@SendTo("resp-topic")
Message<BarResponse> calculateBar(BarCommand command) {
BarResponse response = new BarResponse(command);
return MessageBuilder
.withPayload(response)
.build();
}
}
根据文档:
为了支持@SendTo,侦听器容器工厂必须提供一个KafkaTemplate(在其replyTemplate 属性中),用于发送回复。
KafkaTemplate 是一种参数化类型,需要提供将要生成的消息的键和值。我想出一个既支持又支持消息类型的模板时遇到FooResponse
问题BarResponse
。这似乎是必须的,因为容器工厂只接受一个模板。
由于希望这两个模板共享整个配置基础(属性、错误处理程序),我可以实例化一个模板类型的模板,该模板KafkaTemplate<UUID, Object>
能够生成带有序列化主体的消息FooResponse
并将BarResponse
其注入容器工厂:
@Configuration
public class KafkaAsyncApiProducerTemplatesConfig {
private final String bootstrapServers;
public KafkaAsyncApiProducerTemplatesConfig(@Value("${kafka.bootstrap-servers}") String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
@Bean("asyncApiResponseTemplate")
public KafkaTemplate<UUID, Object> asyncApiResponseKafkaTemplate() {
return new KafkaTemplate<>(kafkaAsyncApiProducerFactory());
}
private ProducerFactory<UUID, Object> kafkaAsyncApiProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
}
@Component
public class KafkaAsyncApiCommandsListenerContainerFactory extends ConcurrentKafkaListenerContainerFactory<UUID, CalculateCustomerBalanceCommand> {
private final String bootstrapServers;
private final KafkaTemplate<UUID, Object> kafkaTemplate;
public KafkaAsyncApiCommandsListenerContainerFactory(
@Value("${kafka.bootstrap-servers}") String bootstrapServers,
@Qualifier("asyncApiResponseTemplate") KafkaTemplate<UUID, Object> kafkaTemplate
) {
super();
this.bootstrapServers = bootstrapServers;
this.kafkaTemplate = kafkaTemplate;
this.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
this.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000, 3)));
this.setReplyTemplate(kafkaTemplate);
}
private Map<String, Object> consumerConfig() {
return new HashMap<String, Object>() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, UUIDDeserializer.class);
put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
put(JsonDeserializer.TRUSTED_PACKAGES, "*");
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}};
}
}
我对我的方法不满意,无论是因为使用作为值参数来绕过静态类型的一种骇人听闻的方式,还是因为将来可能出现的潜在配置不灵活性。假设我想为和消息提供显着不同的配置模板。差异将需要不同的生产者配置。使用具有包罗万象的值类型的单个模板是不可能实现的。KafkaTemplate
Object
FooResponse
BarResponse
Object
有没有办法为侦听器容器提供多个回复模板以根据消息值类型动态选择?Spring Boot 自动配置是否以某种方式尝试解决这种用例?我不能在我的项目中使用它,但不介意它的代码中的任何提示。也许这个要求只能通过实例化两个单独的侦听器容器(附有不同的回复模板)侦听同一主题来满足?如果后一种方法是正确的,我如何确保跨多个侦听器容器的正确消息传递(理想情况下具有一次性语义)?
有没有办法为侦听器容器提供多个回复模板以根据消息值类型动态选择?
不。
Spring Boot 自动配置是否以某种方式尝试解决这种用例?
不。
但是,当返回类型为Message<?>
时,泛型参数无关紧要。看
/**
* Send a message with routing information in message headers. The message payload
* may be converted before sending.
* @param message the message to send.
* @return a Future for the {@link SendResult}.
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION
* @see org.springframework.kafka.support.KafkaHeaders#KEY
*/
ListenableFuture<SendResult<K, V>> send(Message<?> message);
泛型类型仅在使用带有键和/或值的发送方法时才有意义。
您可以创建一个子类KafkaTemplate
并覆盖 send 方法以调用具有适当泛型类型的委托模板,但这将毫无意义,除非您返回FooResponse
或BarResponse
直接返回,而不是将它们组装成Message<?>
.
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句