Spring integration: failure of conversion of gateway error to message

Victor Sorokin

I want my service gateway to return any error in its immediate flow (up to the nearest async channel) to service caller as a valid message (not exception), in sync fashion.

Entire flow is on a single thread. My minimal example refuses to work as desired and I can't figure out proper way to use framework to achieve my goal. Please, see code below.

@RunWith(SpringRunner.class)
public class ErrorHandlingTests {

    @Autowired
    ErrorsHandlingService errorsHandlingService;

    @EnableIntegration
    @Configuration
    static class Config {

        @Bean
        IntegrationFlow errorHandler() {
            return IntegrationFlows.from("errorChannel").
                    handle(errorMessage -> {
                        Message<?> failedMessage = ((MessagingException) errorMessage.getPayload()).getFailedMessage();
                        MessageChannel replyChannel = (MessageChannel) failedMessage.getHeaders().getReplyChannel();
                        replyChannel.send(new GenericMessage<>("Failure for " + failedMessage.getPayload()));
                    })
                    .get();
        }

        @Bean
        IntegrationFlow errorsHandlingFlow1() {
            return IntegrationFlows.from(ErrorsHandlingService.class, gws -> gws.errorChannel("errorChannel"))
                    .transform(new AbstractPayloadTransformer<String, String>() {
                        @Override
                        protected String transformPayload(String s) {
                            if (s.contains("oops"))
                                throw new IllegalArgumentException("Bad value");
                            return "R: " + s;
                        }
                    })
                    .get();
        }
    }


    @Test
    public void testErrorHandlingInFlow1() {
        assertEquals("R: a", errorsHandlingService.process("a"));
        assertEquals("Failure for oops", errorsHandlingService.process("oops"));
    }
}

This test hangs on 2nd assert and log prints:

W 210124 161538.597 [] [main] GenericMessagingTemplate$TemporaryReplyChannel - Reply message received but the receiving thread has exited due to an exception while sending the request message: GenericMessage [payload=Failure for oops, headers={id=57f79307-0778-88b4-9261-9040633cfc03, timestamp=1611494138597}]

All of this happens on the latest 5.3.4 release.

Artem Bilan

You don't need to use that replyChannel.send() manually. For such an error handling and compensation reply producing you just need to do like this:

<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())

The logic in the gateway is like this:

  1. send-n-receive - sync or async - via replyChannel header.
  2. If failure with the previous, catch that exception and process it in the handleSendAndReceiveError()
  3. It takes an errorChannel configured for this gateway and performs send-n-receive of ErrorMessage
  4. The reply is expected as regular one from that sub-flow.

Well, the actual problem is that you use a replyChannel from the failedMessage, which is not valid any more since the main flow has failed already. At this point we deal with new error message flow and we really should rely on its headers already. Or just let the framework to do reply correlation for us!

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Spring Integration Gateway with no arguments

Spring Integration Gateway VS Adapters

Spring Integration Message History

Error handling with WebFlux Outbound Gateway in Spring Integration

docker-compose with Spring Boot and MySQL integration keeps occurs error Communications link failure

In spring integration how to configure a message gateway with java annotations only, and make sure the gateway sees the reply

how to personalise message for data conversion failure in Spring MVC?

Spring Integration: Mail error and HTTP gateway response

Spring Integration message polling

Spring Integration http-inbound-gateway / RequestMappingHandlerMapping

Spring Integration JDBC lock failure

Name resolution failure with Spring Gateway and Eureka

Spring Integration Async Gateway Response Handling

Spring integration - decorate message on fail

Spring Integration gateway threads released or not

Spring Integration - Message Driven Channel Adapter - transformer error handle, original message

reply timeout in spring integration tcp gateway

Is there any way to send empty payload message using spring integration gateway?

Callback in Outbound Gateway Spring Integration

spring integration callback on message received

Async Messaging Gateway in Spring Integration with RxNetty

<int-http:outbound-gateway > how to use message-converters Attributes in Spring Integration

Spring integration gateway routing

How to get payload values from byte[] message in Spring Integration int-http:outbound-gateway?

Spring integration persistent message store

Integration of Spring Cloud Gateway With Spring Cloud Sleuth

Validator Exception Spring Integration: returning an error message in validator

Spring Integration and the Message Bridge Pattern

Spring Integration error: Failure to write to 'foo/002.txt.writing' while uploading the file