Send supertype as type information in Kafka JSON Serialization

daniu :

I'm using Spring Boot to send data from one application to the other using Kafka.

My design uses an interface to declare the data being sent:

package domain;

interface Data {
    public String getData();
    public void setData(String data);
}

Producer

In the source app, I implement this interface as a db Entity.

package persistence;

@Data
class DataEntity implements Data {
    private String data;  // lombok generates getter/setters
}

When an entity is added, I want send it as an update to Kafka using the KafkaTemplate

@Component
class DataPublisher implements ApplicationListener<DataEvent> {
    @Autowired private KafkaTemplate<String,Data> template;

    // I left out DataEvent which is a straightforward ApplicationEvent override
    @EventListener(classes = DataEvent.class)
    public void onApplicationEvent(DataEvent event) {
        template.send("data", (Data) event.getSource());
    }
}
// triggered by this call in a service
    eventPublisher.publishEvent(new DataEvent(updatedData));

The serialization is done via the properties

spring:
    kafka:
        consumer:
            value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
            properties.spring.json.value.default.type: domain.Data

Looking at kafkacat output, the data is sent fine.

Consumer

On the receiving side I have

@KafkaListener(topics = "data")
public void dataUpdated(@Payload Data data) {
    dataService.updateData(data);
}

which results in

Caused by: java.lang.IllegalArgumentException: The class 'persistence.DataEntity' is not in the trusted packages [...]

which I understand perfectly fine - the serializer sends a persistence.DataEntity object, but the client expects a domain.Data object. But that's how the design is supposed to be; I want the client to only know about the domain package, not its persistence implementation. (As a side question, where can I see this type header? It's not in the encoded json AFAICT, what am I missing?)

So the question is: how do I force the Spring JsonDeserializer to send domain.Data as the serialized data type?

I did find a TYPE_MAPPING property in the serializer class, but its only documentation is that it "add[s] type mappings to the type mapper: 'foo:com.Foo,bar:com.Bar'" which doesn't explain anything and I can't find an example usage.

EDIT:

I did add

spring.kafka.producer.properties.spring.json.type.mapping=domain.Data:persistence.DataEntity

to the properties of the producer, but that didn't help.

Gary Russell :

See the documentation.

You have to provide mapping on both sides.

However, instead of using the JsonDeserializer, you should use a BytesDeserializer and a BytesJsonMessageConverter (simply add one as a @Bean and Boot will wire it into the container factory).

That way, the framework will automatically convert to the parameter type.

Again, see the documentation.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Losing type information due to erause, JSON serialization with Jackson/Scalatra

how to add type information in Json serialization in Angular 7

Kafka producer JSON serialization

How to force JSON serialization to include type information for a polymorphic System.Enum member?

How to add types information to JSON on serialization?

Spring Kafka Streams: Type Information Missing

Spring Kafka - deserialise Pojos without type information

Jackson Serialization of Class with Generic Property losing type information of the generic property

One time json serialization with type info in ServiceStack

Serialization of a basic sum type in Json with Aeson

How to exclude specific type from json serialization

JSON serialization - set default value for int type

Erlang exclude type from supertype

Does Json.NET cache types' serialization information?

spring kafka No type information in headers and no default type provided

Send file along with JSON Information with AJAX

how to send Json file to kafka from Python

Send/produce json message through kafka

Send a json payload to Apache Kafka topic in Spring

Json serialization returns: type 'Null' is not a subtype of type 'String'

Changing the information before serialization

Serialization get node information

julia incorrect type when subtyping parametric supertype

Create object table using type with supertype attribute

Check if a type is a supertype of another using typetag

Set Jackson JSON serialization for specific type to use toString() using ObjectMapper

Json.NET custom serialization of a enum type with data annotation

Json.Net Serialization of Type with Polymorphic Child Object

Deserialize JSON array with different values type with kotlinx.serialization library