Spring Integration Concurrency of ServiceActivators

pkm

This is in continuation of my earlier question Spring Integration File reading. In summary, I have a fileIn channel ( a Queue) and then a ServiceActivator which processes the file and an outbound-adapter to save the file.

I wanted to introduce concurrency, to process the messages in multiple threads. I am using java DSL ( but not Java8). I was able to do it using following way...

@Bean
public MessageChannel fileInChannel() {
    return MessageChannels.queue("fileIn").get();
}

@Bean
public IntegrationFlow fileProcessingFlow() {
    return IntegrationFlows.from(fileInChannel())
            .handle(myFileProcessor, "processFile",
                    new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() {
                        @Override
                        public void accept(GenericEndpointSpec<ServiceActivatingHandler> t) {
                            t.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1).taskExecutor(Executors.newCachedThreadPool()));
                        }
                    })
            .handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
            .get();
}

This worked! I also tried following

public IntegrationFlow fileProcessingFlow() {
    return IntegrationFlows.from(fileInChannel())
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .handle(myFileProcessor)
            .handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
            .get();
}

This also worked!! I dont know whether it is just a style, or one approach is better than the other. If so, which approach is better.

Secondly, in the above case, is "file writing" (i.e the last step) sequential or will it work in different threads. If I need concurrency there too, should I introduce another taskExecutor channel between handle(fileProcessor) and handle(outBoundAdapter)? Eventually the outboundadapter will be a remote file S3 adapter, hence the question

Gary Russell

It's just style although I would tend to prefer the second. The file adapter is thread safe.

In general, the writes will occur in parallel.

The only exception to that is if you are writing with FileExistsMode.APPEND, in which case a lock is held during the write and, if the file name hashes to the same lock (there are 256 locks) as another file that is being written then it will run when the first completes.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related