Reactor .then() does not wait till all items emitted before starting processing

Ketone Maniac

I am trying to print a counter which increments from a doOnNext() function.

    public static void main(String [] args) {
        final AtomicInteger counter = new AtomicInteger();
        Mono.just(List.of(1))
                .doOnNext(i -> System.out.println("doOnNext " + counter.incrementAndGet()))
                .map(i -> {
                    System.out.println("map " + counter.get());
                    return i;
                })
                .then(thenFunction(counter))
                .block();
    }

    private static Mono<Integer> thenFunction(final AtomicInteger counter) {
        System.out.println("then " + counter.get());
        return Mono.just(2);
    }

From the documentation, what the then() function does is

Let this Flux complete then play signals from a provided Mono.

So I should be expecting the doOnNext() and map() to complete before the then() works. However, the output is

then 0
doOnNext 1
map 1

Shouldn't the then() wait till the upstream completes before processing?

I am using spring-boot-starter-webflux 2.3.8.RELEASE

Phil Clay

In your example, the thenFunction method is invoked during the assembly phase while constructing the stream. Therefore, System.out.println("then " + counter.get()); is invoked before elements start flowing through the stream.

To defer the logic within the thenFunction until subscription time, you can either wrap the thenFunction call in a Mono.defer, like this:

                .then(Mono.defer(() -> thenFunction(counter)))

Or, (preferred) you can make the thenFunction defer its logic internally until the returned Mono is subscribed, like this:

    private static Mono<Integer> thenFunction(final AtomicInteger counter) {
        return Mono.fromCallable(() -> {
            System.out.println("then " + counter.get());
            return 2;
        });
    }

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

How to make all threads (pthread) wait till rest of all other threads are running before starting its execution?

Flutter Getx , Wait till all main binding load before Navigation

Java Reactor Flux/Mono, when does doOnNext get triggered before or after element is emitted?

MySQL - Docker Image : How to wait till the tables are populated before starting the next container

Wait till all Observables are completed

Looping through items and waiting till all items has been checked before success Swift

Firebase Storage - Wait till all upload tasks are completed before executing function

Before Shutdown wait till saving is finished

Wait till first method is executed in all threads till execute second

Does CasperJs then() wait on emitted events in the previous function?

Wait till all tasks are run in Celery python

How to wait till all threads finish their work?

Load all images on page and wait till done

python multithreading wait till all threads finished

python multithreading wait till all threads finished

Wait till all the promises are resolved / Firestore

action does not wait till function call ends

Applying an operation to all previously emitted items

Wait till all files are downloaded and wait till proccessing with those files are finished

ExpressJs wait till MongoDB fetch data and loop through before the output

Node: wait till function is finished before executing process.exit

wait till bootstrapTable is fully loaded before doing something

Wait till one function finishes before executing the second JS

Does CASE expression evaluate all cases before processing?

Let threads wait for all tasks to complete before starting on the next set of tasks

Airflow wait for all tasks in batch to finish before starting new set of asks

When cronjobs are set to replace, does Kubernetes wait for the previous job to finish shutting down before starting the new one?

Android Kotlin flow operator - wait until all flows have emitted

Wait for click before starting the game in phaser

TOP Ranking

  1. 1

    Failed to listen on localhost:8000 (reason: Cannot assign requested address)

  2. 2

    pump.io port in URL

  3. 3

    How to import an asset in swift using Bundle.main.path() in a react-native native module

  4. 4

    Loopback Error: connect ECONNREFUSED 127.0.0.1:3306 (MAMP)

  5. 5

    Compiler error CS0246 (type or namespace not found) on using Ninject in ASP.NET vNext

  6. 6

    BigQuery - concatenate ignoring NULL

  7. 7

    Spring Boot JPA PostgreSQL Web App - Internal Authentication Error

  8. 8

    ggplotly no applicable method for 'plotly_build' applied to an object of class "NULL" if statements

  9. 9

    ngClass error (Can't bind ngClass since it isn't a known property of div) in Angular 11.0.3

  10. 10

    How to remove the extra space from right in a webview?

  11. 11

    Change dd-mm-yyyy date format of dataframe date column to yyyy-mm-dd

  12. 12

    Jquery different data trapped from direct mousedown event and simulation via $(this).trigger('mousedown');

  13. 13

    maven-jaxb2-plugin cannot generate classes due to two declarations cause a collision in ObjectFactory class

  14. 14

    java.lang.NullPointerException: Cannot read the array length because "<local3>" is null

  15. 15

    How to use merge windows unallocated space into Ubuntu using GParted?

  16. 16

    flutter: dropdown item programmatically unselect problem

  17. 17

    Pandas - check if dataframe has negative value in any column

  18. 18

    Nuget add packages gives access denied errors

  19. 19

    Can't pre-populate phone number and message body in SMS link on iPhones when SMS app is not running in the background

  20. 20

    Generate random UUIDv4 with Elm

  21. 21

    Client secret not provided in request error with Keycloak

HotTag

Archive