Spring Boot Kafka消息传递。如何简化处理程序的dto映射?

塞尔吉(Sergii):

我已经使用Kafka配置了我的spring boot项目。我可以接收和发布任何基于字符串的消息。

字符串消息不是处理的最佳方法。具有将消息从字符串默认转换为对象的功能将更加有用。

实现此功能,我需要将几乎所有的Kafka配置从yml移到java(使用属性)。...制作人示例

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AccountSerializer.class);
    return props;
}

@Bean
public ProducerFactory<String, Account> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Account> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

该代码有效,但是我接受了简化。在最好的情况下,我想进行优雅的配置yml,可能需要进行一些Java更改。但是以直接的方式进行操作,我将获得每3个bean额外的配置每个kafkaTemplatelistenerFactory

是否有可能简化将来的配置(我需要更多的Serializer`Deserializer`)?怎么样?

聚苯乙烯

我想yml以类似的方式配置此示例

spring:
  kafka:
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

app:
  topic:
    foo: foo.t

但我不清楚如何在此处配置使用者\生产者不同(DeSerializer,将它们映射到指定主题...

塞尔吉(Sergii):

看来我没有任何机会为同一个侦听器配置不同的SERIALIZER| DESERIALIZERs。

但是id并不代表我的问题没有解决方案。

我对所有对象都使用了继承,并提供了抽象AbstractEventAbstractEvent通常没有用,但在我的解决方案中使用它,例如输入SERIALIZER| DESERIALIZER为了获得有关上下文中对象的信息,我使用了自定义标头。org.apache.kafka.common.serialization.Deserializer没有标头参数,但我已经DESERIALIZER基于实现了ExtendedDeserializer这种方式使我可以访问标题

via public T deserialize(String topic, Headers headers, byte[] data)

我的反序列化器的示例

@Slf4j
public class AbstractEventDeserializer<T extends AbstractEvent> implements ExtendedDeserializer<T> {

    private Map<String, Class<T>> mappers = new HashMap<>();

    // default behavior
    @Override
    public T deserialize(String arg0, byte[] devBytes) {
        ObjectMapper mapper = new ObjectMapper();
        T bar = null;
        try {
            bar = (T) mapper.readValue(devBytes, Bar.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return bar;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
    }

    @Override
    public T deserialize(String topic, Headers headers, byte[] data) {
        log.info("handling...");
        headers.forEach(header -> log.info("   {}: {}", header.key(), getHeaderValueAsString(header)));
        Optional<String> classTypeFromHeader = getClassTypeFromHeader(headers);
        if (classTypeFromHeader.isPresent()) {
            return parseFromJson(data, mappers.get(classTypeFromHeader.get()));
        }
        return deserialize(topic, data);
    }

    private Optional<String> getClassTypeFromHeader(Headers headers) {
        return StreamSupport.stream(headers.headers("X-CLASS-TYPE").spliterator(), false)
                .map(Header::value)
                .map(String::new)
                .findFirst();
    }

    private String getHeaderValueAsString(Header header) {
        return Optional.ofNullable(header.value())
                .map(String::new)
                .orElse(null);
    }

    @Override
    public void configure(Map<String, ?> arg0, boolean arg1) {
        log.info("configuring deserialiser");
        if (arg0.containsKey("mappers")) {
            this.mappers = (Map<String, Class<T>>) arg0.get("mappers");
        }
        arg0.keySet().forEach(key -> log.info("   {}:{}", key, arg0.get(key)));
    }

}

如果您想尝试解决方案,请查看实验示例

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章