In a Spring Boot application, we´re using a request-reply pattern by leveraging the RendezvousChannel
from Spring Integration. When we receive a request, we create a uniquely named channel and register it within the Spring application context as follows:
RendezvousChannel rendezvousChannel = MessageChannels.rendezvous(uniqueId).get();
ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) springContext;
SingletonBeanRegistry beanRegistry = configurableApplicationContext.getBeanFactory();
beanRegistry.registerSingleton(uniqueId, rendezvousChannel);
We then add this channel name to the request and do some work that takes some seconds. Then the response to this request arrives and is routed into the RendezvousChannel
:
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(globalChannel)
.route("payload['replyChannel']")
.get();
}
This works great and we receive the response to the request as desired. But under load, when lots of temp. RendezvousChannels
are created, the routing sometimes fails with:
org.springframework.messaging.MessagingException: failed to resolve channel name 'uniqueId';
nested exception is org.springframework.messaging.core.DestinationResolutionException: failed to look up MessageChannel with name 'uniqueId' in the BeanFactory.;
nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'uniqueId' available, failedMessage=GenericMessage...
at org.springframework.integration.router.AbstractMappingMessageRouter.resolveChannelForName(AbstractMappingMessageRouter.java:227)
at org.springframework.integration.router.AbstractMappingMessageRouter.addChannelFromString(AbstractMappingMessageRouter.java:258)
at org.springframework.integration.router.AbstractMappingMessageRouter.addToCollection(AbstractMappingMessageRouter.java:282)
at org.springframework.integration.router.AbstractMappingMessageRouter.determineTargetChannels(AbstractMappingMessageRouter.java:186)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:171)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
Currently I´m not sure why this happens. Any ideas on this?
For request-reply scenarios we recommend to use a @MessagingGateway
: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints-chapter.html#gateway
This one populates a TemporaryReplyChannel
instance into the headers under the replyChannel
key. Before sending to some outbound, external service you need to use something like this in your IntegrationFlow
:
.enrichHeaders(h -> h.headerChannelsToString())
This way the mentioned TemporaryReplyChannel
is stored in some specific HeaderChannelRegistry
: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-transformation-chapter.html#header-channel-registry
After that you really can provide some transform()
to pack a replyHeader
into the payload according your requirements. For this purpose, by the way, we provide something like EmbeddedJsonHeadersMessageMapper
, which can be used from the mentioned transform()
as a plain POJO consumer.
When you receive a reply, you should ensure that at least required replyChannel
property comes together with the actual reply payload. In this case you again can use EmbeddedJsonHeadersMessageMapper.toMessage()
to remap embedded headers back to the MessageHeaders
or you need to ensure the remapping by yourself. What is important that you have to populate a replyChannel
header in this case. In the end you can just reply on the standard mechanism to send an output to the replyChannel
from header. The mentioned above HeaderChannelRegistry
will ensure resolution of the string id to the actual TemporaryReplyChannel
instance meanwhile a gateway in the beginning will still wait for the value from that TemporaryReplyChannel
.
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments