How to do integration testing of KStream topology using spring-boot EmbeddedKafka?

tintin

I have a simple spring-boot KStream topology that transforms a string from lowercase to uppercase. I want my integration test to launch an embedded kafka, and then test the topology. I would like to know if it possible to write integration tests like these using spring @EmbeddedKafka?

I have seen several examples using @EmbeddedKafka with simple consumers using @KafkaListener but not any that uses KStream.

I tried attempting to test the following topology to transform from incoming text stream from lowercase to uppercase.

Here's the topology:


    @Configuration
    public class UppercaseStream {
    
        private static final String LOWERCASE_TOPIC = "t.lower.case";
        private static final String UPPERCASE_TOPIC = "t.upper.case";
    
        @Bean
        @Qualifier("kStreamPromoToUppercase")
        public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
    
            KStream<String, String> sourceStream = builder
                    .stream(LOWERCASE_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
    
            sourceStream.print(Printed.<String, String>toSysOut().withLabel("Original KStream..."));
    
            KStream<String, String> upperCaseStream = sourceStream.mapValues(text -> text.toUpperCase());
    
            upperCaseStream.print(Printed.<String, String>toSysOut().withLabel("Uppercase KStream..."));
    
            upperCaseStream.to(UPPERCASE_TOPIC);
    
            return upperCaseStream;
        }
    }

The unit test that tests the topology is:


    @TestInstance(TestInstance.Lifecycle.PER_CLASS)
    public class UpperCaseTopologyTest {
        TopologyTestDriver testDriver;
    
        @AfterAll
        void tearDown(){
            testDriver.close();
        }
    
        @Test
        @DisplayName("should transform lowercase to uppercase words")
        void shouldTransformLowercaseWords() {
            //Given
            StreamsBuilder builder = new StreamsBuilder();
            new UppercaseStream().kStreamPromoToUppercase(builder);
    
            Topology topology = builder.build();
            // setup test driver
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    
            //Create a Topology Test Driver 
            testDriver = new TopologyTestDriver(topology, props);
            TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("t.lower.case", new Serdes.StringSerde().serializer(), new Serdes.StringSerde().serializer());
            TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic("t.upper.case", new Serdes.StringSerde().deserializer(), new Serdes.StringSerde().deserializer());
    
            //When
            inputTopic.pipeInput("test");
    
            //Then
            assertThat(outputTopic.readValue()).isEqualTo("TEST");
        }
    }

I want to write an integration test that first launches an embedded kafka server and then test the UppercaseStream topology.

I tried the following:


    @SpringBootTest
    @DirtiesContext
    @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
    class EmbeddedKafkaIntegrationTest {
    
        @Autowired
        public KafkaTemplate<String, String> template;
        @Autowired
        private KafkaConsumer consumer;
        private KafkaStreams kafkaStreams;
    
        @Value("${test.topic}")
        private String topic;
    
        @Autowired
        private KafkaStreamsConfiguration kafkaStreamsConfiguration;
        
        @Test
        public void should_transform_lowercase_to_uppercase() throws Exception {
            //Create a StreamsBuilder
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            streamsBuilder.stream(topic, Consumed.with(new Serdes.StringSerde(), new Serdes.StringSerde()));
    
            //Add a topology
            new UppercaseStream().kStreamPromoToUppercase(streamsBuilder);
            kafkaStreams = new KafkaStreams(streamsBuilder.build(), kafkaStreamsConfiguration.asProperties());
    
            kafkaStreams.start();
            template.send(topic, "test");
            consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
    
            assertThat(consumer.getLatch().getCount(), equalTo(0L));
            assertThat(consumer.getPayload(), containsString("TEST"));
        }
    
        @After
        public void tearDown() {
            if (kafkaStreams!= null) kafkaStreams.close();
    
        }
    }

The test fails the assertion. I am not sure how to get kStreamPromoToUppercase bean. I am not sure if I am trying following the correct approach.

tintin

There were a few things missing from the integration test.

A couple NewTopic kafka client admin objects were needed to represent an input and an output topics

@Bean public NewTopic createInputTopic() { return new NewTopic(inputTopic,Optional.of(1), Optional.empty()); } The other one is for the output topic

@Bean public NewTopic createOutputTopic() { return new NewTopic(outputTopic,Optional.of(1), Optional.empty()); }

The rest of the test remains more of less the same. As suggested by @Garry I used the kafka consumer.

    @SpringBootTest
    @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
    class KStreamSampleApplicationTests {
    
    
        private final KafkaProperties kafkaProperties;
        private final String inputTopic;
        private final String outputTopic;
    
        @Autowired
        public KStreamSampleApplicationTests(KafkaProperties kafkaProperties, Environment env) {
            this.kafkaProperties = kafkaProperties;
            this.inputTopic = env.getProperty("spring.kafka.input-lowercase-topic");
            this.outputTopic = env.getProperty("spring.kafka.output-uppercase-topic");
    
        }
        
        @Test
        @DisplayName("should test uppercaseStream topology")        
        void shouldTestUppercaseStreamTopology() {
            //Given
            Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(
                    String.join(",", kafkaProperties.getBootstrapServers())));
    
            //Create a kafka producer
            Producer<String, String> producer = new DefaultKafkaProducerFactory<>(producerProps, new StringSerializer(), new StringSerializer()).createProducer();
    
    
            Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(String.join(",", kafkaProperties.getBootstrapServers()), "testGroup", "true");
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
            //Create a Consumer client
            Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer()).createConsumer();
            consumer.subscribe(Collections.singleton(outputTopic));
    
            //When
            producer.send(new ProducerRecord<>(inputTopic, "test"));
            producer.flush();
    
            //Then
            assertThat(producer).isNotNull();
            //And
            ConsumerRecords<String, String> rec = consumer.poll(Duration.ofSeconds(3));
            Iterable<ConsumerRecord<String, String>> records = rec.records(outputTopic);
            Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
    
            if (!iterator.hasNext()) Assertions.fail();
    
            ConsumerRecord<String, String> next = iterator.next();
            assertThat(next.value()).isEqualTo("TEST");
        }
    }

Here's the gist of the full refactored solution.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Unit testing a kafka topology that's using kstream joins

Using WebClient for Spring Boot integration testing

How do you configure Embedded MongDB for integration testing in a Spring Boot application?

Integration Testing Spring Boot service using Eureka services

spring-boot integration testing using rest-assured

How to do an integration test for a Spring Boot application?

Using TestRestTemplate for Spring Integration Testing

How do do slice testing in Spring Boot 1.4 using @DataJpaTest with SpringFox @EnableSwagger2

Spring Boot: pruning application context for integration testing

Spring Boot + Cloud | Zuul Proxy | Integration testing

Spring-Boot module based integration testing

Spring Boot Integration Testing with mocked Services/Components

Integration testing a Spring Boot web app with TestNG

How to do integration testing in Clojure?

How to do integration testing in Flutter?

How to avoid using an interceptor in Spring boot integration tests

Kafka KStream - topology design

Spring cloud stream integration test with EmbeddedKafka doesn't work

Spring Cloud Stream Application not connecting to EmbeddedKafka when testing

When and where to use JUnit, Mockito and Integration testing in Spring Boot Application?

spring boot integration testing mock method returns null

Spring Boot integration testing with Async Jobs and Request Scope beans

Spring-Boot Database integration testing against oracle DB

Should I mix integration testing and rest documentation with Spring Boot?

Integration testing of multi-war application in Spring Boot

How to do this topology in Spring Cloud Kafka Streams in function style?

how to download file from sftp server using spring integration without using spring boot

How to do Ember integration testing for route transitions?

How to do integration testing with spring and junit by scanning classpath instead of manually specifying classes to autowire?

TOP Ranking

  1. 1

    Failed to listen on localhost:8000 (reason: Cannot assign requested address)

  2. 2

    Loopback Error: connect ECONNREFUSED 127.0.0.1:3306 (MAMP)

  3. 3

    How to import an asset in swift using Bundle.main.path() in a react-native native module

  4. 4

    pump.io port in URL

  5. 5

    Compiler error CS0246 (type or namespace not found) on using Ninject in ASP.NET vNext

  6. 6

    BigQuery - concatenate ignoring NULL

  7. 7

    ngClass error (Can't bind ngClass since it isn't a known property of div) in Angular 11.0.3

  8. 8

    ggplotly no applicable method for 'plotly_build' applied to an object of class "NULL" if statements

  9. 9

    Spring Boot JPA PostgreSQL Web App - Internal Authentication Error

  10. 10

    How to remove the extra space from right in a webview?

  11. 11

    java.lang.NullPointerException: Cannot read the array length because "<local3>" is null

  12. 12

    Jquery different data trapped from direct mousedown event and simulation via $(this).trigger('mousedown');

  13. 13

    flutter: dropdown item programmatically unselect problem

  14. 14

    How to use merge windows unallocated space into Ubuntu using GParted?

  15. 15

    Change dd-mm-yyyy date format of dataframe date column to yyyy-mm-dd

  16. 16

    Nuget add packages gives access denied errors

  17. 17

    Svchost high CPU from Microsoft.BingWeather app errors

  18. 18

    Can't pre-populate phone number and message body in SMS link on iPhones when SMS app is not running in the background

  19. 19

    12.04.3--- Dconf Editor won't show com>canonical>unity option

  20. 20

    Any way to remove trailing whitespace *FOR EDITED* lines in Eclipse [for Java]?

  21. 21

    maven-jaxb2-plugin cannot generate classes due to two declarations cause a collision in ObjectFactory class

HotTag

Archive