Reactor way to cancel Subscriptions


I try to figure out Reactor Project and I'm looking for a way to cancel Subscriptions. I know that after making Subscription of for example Flux I can get reference to Cancellation object which can be used to send onCancel Signal, but this is only after making subscription and I need to hold that reference in some kind of Collection.

Is there better way to get Cancellation object? Or just to cancel Subscriptions. Maybe some kind of place which contains reference to all active Subscriptions - yeah that will be awesome...

Simon Baslé

In Reactor, there is no sense in wanting to cancel a Subscription before you've called subscribe() (as it is that very method that creates the Subscription and propagates that signal up the chain to start the emission of data).

There is no centralized place with all subscriptions, that doesn't make much sense because you'd need a way of finding the specific subscriptions you want to cancel (and keep in mind that each operator in your chain can use an intermediate Subscription as well...).

Note some operators will also cancel subscriptions on your behalf! That is the case for take(int) for instance, which will cancel upstream once enough items have been emitted:

Flux.just(1, 2, 3, 4).log().take(2).subscribe(System.out::println);

will output:

14:17:48.729 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
14:17:48.732 [main] INFO  reactor.Flux.Array.1 - | request(unbounded)
14:17:48.732 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
14:17:48.732 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
14:17:48.732 [main] INFO  reactor.Flux.Array.1 - | cancel()

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at


Login to comment


TOP Ranking