I am trying to print a counter which increments from a doOnNext() function.
public static void main(String [] args) {
final AtomicInteger counter = new AtomicInteger();
Mono.just(List.of(1))
.doOnNext(i -> System.out.println("doOnNext " + counter.incrementAndGet()))
.map(i -> {
System.out.println("map " + counter.get());
return i;
})
.then(thenFunction(counter))
.block();
}
private static Mono<Integer> thenFunction(final AtomicInteger counter) {
System.out.println("then " + counter.get());
return Mono.just(2);
}
From the documentation, what the then() function does is
Let this Flux complete then play signals from a provided Mono.
So I should be expecting the doOnNext() and map() to complete before the then() works. However, the output is
then 0
doOnNext 1
map 1
Shouldn't the then() wait till the upstream completes before processing?
I am using spring-boot-starter-webflux 2.3.8.RELEASE
In your example, the thenFunction
method is invoked during the assembly phase while constructing the stream. Therefore, System.out.println("then " + counter.get());
is invoked before elements start flowing through the stream.
To defer the logic within the thenFunction
until subscription time, you can either wrap the thenFunction
call in a Mono.defer
, like this:
.then(Mono.defer(() -> thenFunction(counter)))
Or, (preferred) you can make the thenFunction defer its logic internally until the returned Mono is subscribed, like this:
private static Mono<Integer> thenFunction(final AtomicInteger counter) {
return Mono.fromCallable(() -> {
System.out.println("then " + counter.get());
return 2;
});
}
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments