Multiple @EnableBinding with Kafka Spring Cloud Stream

Johnny Willer

I'm trying to set an Spring Boot Application listening to Kafka.

I'm using Kafka Streams Binder.

With one simple @EnableBinding

@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {

        logger.info("Stream listening");

        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    }

    interface StreamProcessor {

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }
}

and in application.yml

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:29092
      bindings:
         input_1:
           destination: mytopic1
           group: readgroup
         output_1:
           destination: mytopic2
         input_2:
           destination: mytopic3
           group: readgroup
         output_2:
           destination: mytopic4
  application:
    name: stream_s1000_app

Everything works fine.

But if I try to add a second class with other binding, the following error occurs:

The following subscribed topics are not assigned to any members: [mytopic1]

Example of the second binding:

@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo {

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {

        logger.info("Stream listening binding two");

        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    }

    interface StreamProcessor {

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }
}

What I'm missing? Can't I use multiple input topics and multiple output in the same application? There is something related to application.name?

sobychacko

I just tried an app and that worked. When you have multiple processors in the same application, you need to make sure that each processor gets its own application id. See below how I have 2 distinct application id's for both inputs in the application.yml.

I saw both processors are getting logged on the console. Also, saw the messages on the output topics.

@SpringBootApplication
@EnableBinding({So54522918Application.StreamProcessor1.class, So54522918Application.StreamProcessor2.class})
public class So54522918Application {

    public static void main(String[] args) {
        SpringApplication.run(So54522918Application.class, args);
    }

    @StreamListener(StreamProcessor1.INPUT)
    @SendTo(StreamProcessor1.OUTPUT)
    public KStream<String, String> process1(KStream<String, String> input) {

        System.out.println("Stream listening");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    }

    @StreamListener(StreamProcessor2.INPUT)
    @SendTo(StreamProcessor2.OUTPUT)
    public KStream<String, String> process2(KStream<String, String> input) {

        System.out.println("Stream listening binding two");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    }

    interface StreamProcessor1 {

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }

    interface StreamProcessor2 {

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }

}

Relevant part of application.yml

spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
  binder.configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  bindings.input_1.consumer.application-id: process-1
  bindings.input_2.consumer.application-id: process-2
spring.cloud.stream.bindings.input_1:
  destination: mytopic1
spring.cloud.stream.bindings.output_1:
  destination: mytopic2
spring.cloud.stream.bindings.input_2:
  destination: mytopic3
spring.cloud.stream.bindings.output_2:
  destination: mytopic4

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Auto commit in kafka with spring cloud stream

Spring Cloud Stream Kafka - Method must be Declarative

Spring cloud stream and consume multiple kafka topics

Spring Cloud Stream Kafka consumer patterns

jackson cannot deserialize (spring cloud stream kafka)

Correctly manage DLQ in Spring Cloud Stream Kafka

Spring Cloud Stream embedded header format (Kafka)

Using Kstream with spring cloud stream and kafka 1.0.1

Error Handling in Spring Cloud Stream - Kafka Binder

Multiple StreamListeners with Spring Cloud Stream connected to Kafka

Pause Spring Cloud Stream Kafka binder programmatically

Understanding Spring Cloud Stream Kafka and Spring Retry

Multiple StreamListeners to same Topic with Spring Cloud Stream connected to Kafka

EnableBinding, Output, Input deprecated Since version of 3.1 of Spring Cloud Stream

Spring Cloud stream compatibility matrix with kafka

spring-cloud-stream kafka avro

Spring cloud stream - Kafka binder performance

Asynchronously write to spring cloud kafka stream

Kafka client id property with spring cloud stream

Spring Cloud Stream Kafka Error channel

spring cloud stream kafka binding config

Migrate Kafka Streams code to Spring Cloud Stream?

Kafka - spring cloud stream

The type EnableBinding is deprecated, The type StreamListener is deprecated - Spring Cloud Stream

Why is my spring cloud stream config creating multiple kafka consumers

Is multiple brokers possible in one Kafka Streams Topology(Spring Cloud Stream)?

Spring cloud stream (Kafka) autoCreateTopics not working

Kafka Sticky Partitioner using spring cloud stream

Spring cloud stream: Multiple functionRouters