According to this topic:
Kafka Spring Integration: Headers not coming for kafka consumer - this is no headers support for Kafka
But documentation says:
spring.cloud.stream.kafka.binder.headers
The list of custom headers that will be transported by the binder.Default: empty.
I can't get it working with spring-cloud-stream-binder-kafka: 1.2.0.RELEASE
SENDING LOG:
MESSAGE (e23885fd-ffd9-42dc-ebe3-5a78467fee1f) SENT :
GenericMessage [payload=...,
headers={
content-type=application/json,
correlationId=51dd90b1-76e6-4b8d-b667-da25f214f383,
id=e23885fd-ffd9-42dc-ebe3-5a78467fee1f,
contentType=application/json,
timestamp=1497535771673
}]
RECEIVING LOG:
MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 1:
GenericMessage [payload=...,
headers={
kafka_offset=36,
id=448175f5-2b21-9a44-26b9-85f093b33f6b,
kafka_receivedPartitionId=0,
contentType=application/json;charset=UTF-8,
kafka_receivedTopic=new_patient, timestamp=1497535771715
}]
MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 2 :
GenericMessage [payload=...,
headers={
kafka_offset=36,
id=448175f5-2b21-9a44-26b9-85f093b33f6b,
kafka_receivedPartitionId=0,
contentType=application/json;charset=UTF-8,
kafka_receivedTopic=new_patient, timestamp=1497535771715
}]
I expect to see the same message id and get correlationId on receiving side.
application.properties:
spring.cloud.stream.kafka.binder.headers=correlationId
spring.cloud.stream.bindings.newTest.destination=new_test
spring.cloud.stream.bindings.newTestCreated.destination=new_test
spring.cloud.stream.default.consumer.headerMode=embeddedHeaders
spring.cloud.stream.default.producer.headerMode=embeddedHeaders
SENDING MESSAGE:
@Publisher(channel = "testChannel")
public Object newTest(Object param) {
...
return myObject;
}
Yes, it does: http://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_consumer_properties
headerMode
When set to raw, disables header parsing on input. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when inbound data is coming from outside Spring Cloud Stream applications.
Default: embeddedHeaders
But that is already Spring Cloud Stream story, not Spring Kafka per se.
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments