Does Spring Cloud Stream Kafka supports embedded headers?

Eric2201

According to this topic:
Kafka Spring Integration: Headers not coming for kafka consumer - this is no headers support for Kafka

But documentation says:

spring.cloud.stream.kafka.binder.headers
The list of custom headers that will be transported by the binder.

Default: empty.

I can't get it working with spring-cloud-stream-binder-kafka: 1.2.0.RELEASE

SENDING LOG:

MESSAGE (e23885fd-ffd9-42dc-ebe3-5a78467fee1f) SENT : 
GenericMessage [payload=..., 
headers={
   content-type=application/json, 
   correlationId=51dd90b1-76e6-4b8d-b667-da25f214f383, 
   id=e23885fd-ffd9-42dc-ebe3-5a78467fee1f, 
   contentType=application/json, 
   timestamp=1497535771673
}]

RECEIVING LOG:

MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 1: 
GenericMessage [payload=..., 
headers={
    kafka_offset=36, 
    id=448175f5-2b21-9a44-26b9-85f093b33f6b, 
    kafka_receivedPartitionId=0, 
    contentType=application/json;charset=UTF-8, 
    kafka_receivedTopic=new_patient, timestamp=1497535771715
}]

MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 2 :
GenericMessage [payload=..., 
headers={
    kafka_offset=36, 
    id=448175f5-2b21-9a44-26b9-85f093b33f6b, 
    kafka_receivedPartitionId=0, 
    contentType=application/json;charset=UTF-8, 
    kafka_receivedTopic=new_patient, timestamp=1497535771715
}]

I expect to see the same message id and get correlationId on receiving side.

application.properties:

spring.cloud.stream.kafka.binder.headers=correlationId
spring.cloud.stream.bindings.newTest.destination=new_test
spring.cloud.stream.bindings.newTestCreated.destination=new_test
spring.cloud.stream.default.consumer.headerMode=embeddedHeaders
spring.cloud.stream.default.producer.headerMode=embeddedHeaders

SENDING MESSAGE:

@Publisher(channel = "testChannel")
public Object newTest(Object param) {
    ...
    return myObject;
}
Artem Bilan

Yes, it does: http://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_consumer_properties

headerMode

When set to raw, disables header parsing on input. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when inbound data is coming from outside Spring Cloud Stream applications.

Default: embeddedHeaders

But that is already Spring Cloud Stream story, not Spring Kafka per se.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Spring Cloud Stream embedded header format (Kafka)

How to create unit test with kafka embedded in the spring cloud stream

Using embedded Kafka in spring cloud stream test with custom channel bindings

Spring Cloud Stream Kafka add headers to only one binding

Conditional (content-based) routing on Kafka headers in Spring Cloud Stream

Kafka - spring cloud stream

How can you verify immediately that a message was acknowledged when integration testing using Embedded Kafka in Spring Cloud Stream?

How does Spring Kafka/Spring Cloud Stream guarantee the transactionality / atomicity involving a Database and Kafka?

Spring Cloud Stream Kafka - Eventual consistency - Does Kafka auto retry unacknowledged messages (when using autocommitoffset=false)

Why does Spring Cloud Stream with Kafka binder hash keys differently than a standard Kafka Producer?

Understanding Spring Cloud Stream Kafka and Spring Retry

spring-cloud-starter-stream-kafka does not creates topic provided to spring.boot.cloud.stream.bindings.output.destination

Spring cloud stream (Kafka) autoCreateTopics not working

Spring Cloud Stream Kafka Error channel

Spring Cloud Stream Kafka consumer patterns

Multiple @EnableBinding with Kafka Spring Cloud Stream

Spring Cloud Stream Kafka - Method must be Declarative

spring cloud stream kafka binding config

Auto commit in kafka with spring cloud stream

Error Handling in Spring Cloud Stream - Kafka Binder

Kafka Sticky Partitioner using spring cloud stream

Multiple StreamListeners with Spring Cloud Stream connected to Kafka

Correctly manage DLQ in Spring Cloud Stream Kafka

Spring cloud stream - Kafka binder performance

Spring cloud stream and consume multiple kafka topics

Spring Cloud stream compatibility matrix with kafka

Kafka client id property with spring cloud stream

spring-cloud-stream kafka avro

jackson cannot deserialize (spring cloud stream kafka)