Spring Kafka, Spring Cloud Stream, and Avro compatibility Unknown magic byte

robjwilkins :

I have a problem deserializing messages from Kafka topics. The messages have been serialized using spring-cloud-stream and Apache Avro. I am reading them using Spring Kafka and trying to deserialise them. If I use spring-cloud to both produce and consume the messages, then I can deserialize the messages fine. The problem is when I consume them with Spring Kafka and then try to deserialize.

I am using a Schema Registry (both the spring-boot Schema Registry for development, and also a Confluent schema in production), but the deserialization problems seem to occur before event calling the Schema Registry.

Its hard to post all the relevant code on this question, so I have posted it in a repo in git hub: https://github.com/robjwilkins/avro-example

The object I am sending over the topic is just a simple pojo:

@Data
public class Request {
  private String message;
}

The code which produces messages on Kafka looks like this:

@EnableBinding(MessageChannels.class)
@Slf4j
@RequiredArgsConstructor
@RestController
public class ProducerController {

  private final MessageChannels messageChannels;

  @GetMapping("/produce")
  public void produceMessage() {
    Request request = new Request();
    request.setMessage("hello world");
    Message<Request> requestMessage = MessageBuilder.withPayload(request).build();
    log.debug("sending message");
    messageChannels.testRequest().send(requestMessage);
  }
}

and application.yaml:

spring:
  application.name: avro-producer
  kafka:
    bootstrap-servers: localhost:9092
    consumer.group-id: avro-producer
  cloud:
    stream:
      schema-registry-client.endpoint: http://localhost:8071
      schema.avro.dynamic-schema-generation-enabled: true
      kafka:
        binder:
          brokers: ${spring.kafka.bootstrap-servers}
      bindings:
        test-request:
          destination: test-request
          contentType: application/*+avro

Then I have a consumer:

@Slf4j
@Component
public class TopicListener {

    @KafkaListener(topics = {"test-request"})
    public void listenForMessage(ConsumerRecord<String, Request> consumerRecord) {
        log.info("listenForMessage. got a message: {}", consumerRecord);
        consumerRecord.headers().forEach(header -> log.info("header. key: {}, value: {}", header.key(), asString(header.value())));
    }

    private String asString(byte[] byteArray) {
        return new String(byteArray, Charset.defaultCharset());
    }
}

And the project which consumes has application.yaml config:

spring:
  application.name: avro-consumer
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: avro-consumer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        schema.registry.url: http://localhost:8071

When the consumer gets a message it results in an exception:

2019-01-30 20:01:39.900 ERROR 30876 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-request-0 at offset 43. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

I have stepped through the deserialization code to the point where this exception is thrown

public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe {
....
private ByteBuffer getByteBuffer(byte[] payload) {
  ByteBuffer buffer = ByteBuffer.wrap(payload);
  if (buffer.get() != 0) {
    throw new SerializationException("Unknown magic byte!");
  } else {
    return buffer;
  }
}

It is happening because the deserializer checks byte contents of the serialized object (byte array) and expects it to be 0, however it is not. Hence the reason I question whether the spring-cloud-stream MessageConverter which serialized the object is compatible with the io.confluent object which I am using to deserialize the object. And if they are not compatible, what do I do?

thanks for any help.

robjwilkins :

The crux of this problem is that the producer is using spring-cloud-stream to post messages to Kafka, but the consumer uses spring-kaka. The reasons for this are:

  • The existing system is already well established and uses spring-cloud-stream
  • A new consumer is required to listen to multiple topics using the same method, binding only on a csv list of topic names
  • There is a requirement to consume a collection of messages at once, rather than individually, so their contents can be written in bulk to a database.

Spring-cloud-stream doesn't current allow the consumer to bind a listener to multiple topics, and there is no way to consume a collection of messages at once (unless I'm mistaken).

I have found a solution which doesn't require any changes to the producer code which uses spring-cloud-stream to publish messages to Kafka. Spring-cloud-stream uses a MessageConverter to manage serialisation and deserialisation. In the AbstractAvroMessageConverter there are methods: convertFromInternal and convertToInternal which handle the transformation to/from a byte array. My solution was to extend this code (creating a class which extends AvroSchemaRegistryClientMessageConverter), so I could reuse much of the spring-cloud-stream functionality, but with an interface that can be accessed from my spring-kafka KafkaListener. I then amended my TopicListener to use this class to do the conversion:

The converter:

@Component
@Slf4j
public class AvroKafkaMessageConverter extends AvroSchemaRegistryClientMessageConverter {

  public AvroKafkaMessageConverter(SchemaRegistryClient schemaRegistryClient) {
    super(schemaRegistryClient, new NoOpCacheManager());
  }

  public <T> T convertFromInternal(ConsumerRecord<?, ?> consumerRecord, Class<T> targetClass,
      Object conversionHint) {
    T result;
    try {
      byte[] payload = (byte[]) consumerRecord.value();

      Map<String, String> headers = new HashMap<>();
      consumerRecord.headers().forEach(header -> headers.put(header.key(), asString(header.value())));

      MimeType mimeType = messageMimeType(conversionHint, headers);
      if (mimeType == null) {
        return null;
      }

      Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
      Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);

      @SuppressWarnings("unchecked")
      DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema);
      Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
      result = (T) reader.read(null, decoder);
    }
    catch (IOException e) {
      throw new RuntimeException("Failed to read payload", e);
    }
    return result;
  }

  private MimeType messageMimeType(Object conversionHint, Map<String, String> headers) {
    MimeType mimeType;
    try {
      String contentType = headers.get(MessageHeaders.CONTENT_TYPE);
      log.debug("contentType: {}", contentType);
      mimeType = MimeType.valueOf(contentType);
    } catch (InvalidMimeTypeException e) {
      log.error("Exception getting object MimeType from contentType header", e);
      if (conversionHint instanceof MimeType) {
        mimeType = (MimeType) conversionHint;
      }
      else {
        return null;
      }
    }
    return mimeType;
  }

  private String asString(byte[] byteArray) {
    String theString = new String(byteArray, Charset.defaultCharset());
    return theString.replace("\"", "");
  }
}

The amended TopicListener:

@Slf4j
@Component
@RequiredArgsConstructor
public class TopicListener {

  private final AvroKafkaMessageConverter messageConverter;

  @KafkaListener(topics = {"test-request"})
  public void listenForMessage(ConsumerRecord<?, ?> consumerRecord) {
    log.info("listenForMessage. got a message: {}", consumerRecord);
    Request request = messageConverter.convertFromInternal(
        consumerRecord, Request.class, MimeType.valueOf("application/vnd.*+avr"));
    log.info("request message: {}", request.getMessage());
  }
}

This solution only consumes one message at a time but can be easily modified to consume batches of messages.

The full solution is here: https://github.com/robjwilkins/avro-example/tree/develop

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Spring Cloud stream compatibility matrix with kafka

spring-cloud-stream kafka avro

Unknown magic byte with kafka-avro-console-consumer

Incompatible Avro messages between Spring Cloud Stream Kafka Stream and native Kafka Stream applications and producers

Kafka - spring cloud stream

Unable to read avro messages using kafka-avro-console-consumer. SerializationException: Unknown magic byte

Unrecognised header byte error when try to decode an Avro message in Spring Cloud Stream

Spring cloud stream Confluent KStream Avro Consume

Spring Boot, Spring-Kafka, and Spring-Cloud compatibility

How can I test a Spring Cloud Stream Kafka Streams application that uses Avro and the Confluent Schema Registry?

Understanding Spring Cloud Stream Kafka and Spring Retry

When serializing a Kafka Avro object I then get "org.apache.kafka.common.errors.SerializationException: Unknown magic byte!"

Spring Kafka with Avro Deserializer

Kafka Streams - SerializationException: Unknown magic byte

Kafka Streams - Unknown magic byte on GenericAvroSerde

Difference between Spring Cloud Kafka Streams Vs Spring Cloud Stream?

Spring cloud stream (Kafka) autoCreateTopics not working

Spring Cloud Stream Kafka Error channel

Spring Cloud Stream embedded header format (Kafka)

Spring Cloud Stream Kafka consumer patterns

Multiple @EnableBinding with Kafka Spring Cloud Stream

Spring Cloud Stream Kafka - Method must be Declarative

spring cloud stream kafka binding config

Auto commit in kafka with spring cloud stream

Error Handling in Spring Cloud Stream - Kafka Binder

Kafka Sticky Partitioner using spring cloud stream

Multiple StreamListeners with Spring Cloud Stream connected to Kafka

Correctly manage DLQ in Spring Cloud Stream Kafka

Spring cloud stream - Kafka binder performance

TOP Ranking

  1. 1

    Failed to listen on localhost:8000 (reason: Cannot assign requested address)

  2. 2

    How to import an asset in swift using Bundle.main.path() in a react-native native module

  3. 3

    Loopback Error: connect ECONNREFUSED 127.0.0.1:3306 (MAMP)

  4. 4

    pump.io port in URL

  5. 5

    Spring Boot JPA PostgreSQL Web App - Internal Authentication Error

  6. 6

    BigQuery - concatenate ignoring NULL

  7. 7

    ngClass error (Can't bind ngClass since it isn't a known property of div) in Angular 11.0.3

  8. 8

    Do Idle Snowflake Connections Use Cloud Services Credits?

  9. 9

    maven-jaxb2-plugin cannot generate classes due to two declarations cause a collision in ObjectFactory class

  10. 10

    Compiler error CS0246 (type or namespace not found) on using Ninject in ASP.NET vNext

  11. 11

    Can't pre-populate phone number and message body in SMS link on iPhones when SMS app is not running in the background

  12. 12

    Generate random UUIDv4 with Elm

  13. 13

    Jquery different data trapped from direct mousedown event and simulation via $(this).trigger('mousedown');

  14. 14

    Is it possible to Redo commits removed by GitHub Desktop's Undo on a Mac?

  15. 15

    flutter: dropdown item programmatically unselect problem

  16. 16

    Change dd-mm-yyyy date format of dataframe date column to yyyy-mm-dd

  17. 17

    EXCEL: Find sum of values in one column with criteria from other column

  18. 18

    Pandas - check if dataframe has negative value in any column

  19. 19

    How to use merge windows unallocated space into Ubuntu using GParted?

  20. 20

    Make a B+ Tree concurrent thread safe

  21. 21

    ggplotly no applicable method for 'plotly_build' applied to an object of class "NULL" if statements

HotTag

Archive