Spring CloudStreamアプリケーションのテストを書いています。これには、topicAからのKStreamの読み取りがあります。テストでは、KafkaTemplateを使用してメッセージを公開し、KStreamログが表示されるのを待ちます。
テストは次の例外をスローします。
java.lang.ClassCastException: class com.sun.proxy.$Proxy143 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy143 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')
at org.springframework.cloud.stream.test.binder.TestSupportBinder.bindConsumer(TestSupportBinder.java:66) ~[spring-cloud-stream-test-support-3.0.1.RELEASE.jar:3.0.1.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:169) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:115) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:112) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
この例外は、アプリケーションの通常の実行では表示されません。
KSTREAM:
@Configuration
class MyKStream() {
private val logger = LoggerFactory.getLogger(javaClass)
@Bean
fun processSomething(): Consumer<KStream<XX, XX>> {
return Consumer { something ->
something.foreach { key, value ->
logger.info("--------> Processing xxx key {} - value {}", key, value)
}
}
テスト:
@TestInstance(PER_CLASS)
@EmbeddedKafka
@SpringBootTest(properties = [
"spring.profiles.active=local",
"schema-registry.user=",
"schema-registry.password=",
"spring.cloud.stream.bindings.processSomething-in-0.destination=topicA",
"spring.cloud.stream.bindings.processSomething-in-0.producer.useNativeEncoding=true",
"spring.cloud.stream.bindings.processSomethingElse-in-0.destination=topicB",
"spring.cloud.stream.bindings.processSomethingElse-in-0.producer.useNativeEncoding=true",
"spring.cloud.stream.kafka.streams.binder.configuration.application.server=localhost:8080",
"spring.cloud.stream.function.definition=processSomething;processSomethingElse"])
class MyKStreamTests {
private val logger = LoggerFactory.getLogger(javaClass)
@Autowired
private lateinit var embeddedKafka: EmbeddedKafkaBroker
@Autowired
private lateinit var schemaRegistryMock: SchemaRegistryMock
@AfterAll
fun afterAll() {
embeddedKafka.kafkaServers.forEach { it.shutdown() }
embeddedKafka.kafkaServers.forEach { it.awaitShutdown() }
}
@Test
fun `should send and process something`() {
val producer = createProducer()
logger.debug("**********----> presend")
val msg = MessageBuilder.withPayload(xxx)
.setHeader(KafkaHeaders.MESSAGE_KEY, xxx)
.setHeader(KafkaHeaders.TIMESTAMP, 1L)
.build()
producer.send(msg).get()
logger.debug("**********----> sent")
Thread.sleep(100000)
}
}
@Configuration
class KafkaTestConfiguration(private val embeddedKafkaBroker: EmbeddedKafkaBroker) {
private val schemaRegistryMock = SchemaRegistryMock()
@PostConstruct
fun init() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaBroker.brokersAsString)
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaBroker.brokersAsString)
schemaRegistryMock.start()
System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url", schemaRegistryMock.url)
}
@Bean
fun schemaRegistryMock(): SchemaRegistryMock {
return schemaRegistryMock
}
@PreDestroy
fun preDestroy() {
schemaRegistryMock.stop()
}
}
おそらくspring-cloud-stream-test-support
依存関係として使用しており、この依存関係はバインダーAPIのコア機能の一部をバイパスしてこのエラーを引き起こします。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加