I'm using Spring Boot to send data from one application to the other using Kafka.
My design uses an interface to declare the data being sent:
package domain;
interface Data {
public String getData();
public void setData(String data);
}
Producer
In the source app, I implement this interface as a db Entity.
package persistence;
@Data
class DataEntity implements Data {
private String data; // lombok generates getter/setters
}
When an entity is added, I want send it as an update to Kafka using the KafkaTemplate
@Component
class DataPublisher implements ApplicationListener<DataEvent> {
@Autowired private KafkaTemplate<String,Data> template;
// I left out DataEvent which is a straightforward ApplicationEvent override
@EventListener(classes = DataEvent.class)
public void onApplicationEvent(DataEvent event) {
template.send("data", (Data) event.getSource());
}
}
// triggered by this call in a service
eventPublisher.publishEvent(new DataEvent(updatedData));
The serialization is done via the properties
spring:
kafka:
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties.spring.json.value.default.type: domain.Data
Looking at kafkacat output, the data is sent fine.
Consumer
On the receiving side I have
@KafkaListener(topics = "data")
public void dataUpdated(@Payload Data data) {
dataService.updateData(data);
}
which results in
Caused by: java.lang.IllegalArgumentException: The class 'persistence.DataEntity' is not in the trusted packages [...]
which I understand perfectly fine - the serializer sends a persistence.DataEntity
object, but the client expects a domain.Data
object. But that's how the design is supposed to be; I want the client to only know about the domain package, not its persistence
implementation. (As a side question, where can I see this type header? It's not in the encoded json AFAICT, what am I missing?)
So the question is: how do I force the Spring JsonDeserializer
to send domain.Data
as the serialized data type?
I did find a TYPE_MAPPING property in the serializer class, but its only documentation is that it "add[s] type mappings to the type mapper: 'foo:com.Foo,bar:com.Bar'" which doesn't explain anything and I can't find an example usage.
EDIT:
I did add
spring.kafka.producer.properties.spring.json.type.mapping=domain.Data:persistence.DataEntity
to the properties of the producer, but that didn't help.
See the documentation.
You have to provide mapping on both sides.
However, instead of using the JsonDeserializer
, you should use a BytesDeserializer
and a BytesJsonMessageConverter
(simply add one as a @Bean
and Boot will wire it into the container factory).
That way, the framework will automatically convert to the parameter type.
Again, see the documentation.
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments