我对以下代码的输出有一些疑问:
Flux.just("a", "b", "c", "d")
.log(null, Level.INFO, true) // line: 18
.flatMap(value ->
Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO, true) // line: 21
.take(3)
.log(null, Level.INFO, true) // line: 23
.subscribe(x ->
System.out.println("Thread: " + Thread.currentThread().getName() +
" , " + x));
Thread.sleep(1000 * 1000);
输出:
1. 11:29:11 [main] INFO - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) Flux.log(App.java:18)
2. 11:29:11 [main] INFO - onSubscribe(FluxFlatMap.FlatMapMain) Flux.log(App.java:21)
3. 11:29:11 [main] INFO - onSubscribe(FluxTake.TakeSubscriber) Flux.log(App.java:23)
4. 11:29:11 [main] INFO - request(unbounded) Flux.log(App.java:23)
5. 11:29:11 [main] INFO - request(unbounded) Flux.log(App.java:21)
6. 11:29:11 [main] INFO - | request(2) Flux.log(App.java:18)
7. 11:29:11 [main] INFO - | onNext(a) Flux.log(App.java:18)
8. 11:29:11 [main] INFO - | onNext(b) Flux.log(App.java:18)
9. 11:29:11 [elastic-2] INFO - onNext(A) Flux.log(App.java:21)
10. 11:29:11 [elastic-2] INFO - onNext(A) Flux.log(App.java:23)
11. Thread: elastic-2 , A
12. 11:29:11 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13. 11:29:11 [main] INFO - | onNext(c) Flux.log(App.java:18)
14. 11:29:11 [elastic-3] INFO - onNext(B) Flux.log(App.java:21)
15. 11:29:11 [elastic-3] INFO - onNext(B) Flux.log(App.java:23)
16. Thread: elastic-3 , B
17. 11:29:11 [elastic-3] INFO - | request(1) Flux.log(App.java:18)
18. 11:29:11 [elastic-3] INFO - | onNext(d) Flux.log(App.java:18)
19. 11:29:11 [elastic-3] INFO - | onComplete() Flux.log(App.java:18)
20. 11:29:11 [elastic-3] INFO - onNext(C) Flux.log(App.java:21)
21. 11:29:11 [elastic-3] INFO - onNext(C) Flux.log(App.java:23)
22. Thread: elastic-3 , C
23. 11:29:11 [elastic-3] INFO - cancel() Flux.log(App.java:21)
24. 11:29:11 [elastic-3] INFO - onComplete() Flux.log(App.java:23)
25. 11:29:11 [elastic-3] INFO - | cancel() Flux.log(App.java:18)
问题:每个问题都与输出中的特定行有关(而不是代码中的一行)。我还为其中一些添加了我的答案,但我不确定我是否正确。
订阅时,订阅操作会询问unbounded
元素数量。那么为什么事件:request(unbounded)
在管道中下降而不是上升?我的回答:unbounded
金额请求上升take
,然后take
再次发送。
flatMap
发送cancel
信号。为什么不take
发送呢?
最后一个问题:输出中有多个终端信号。这不是对反应流规范的评估吗?
在那种情况下,将只产生一个终端信号。
Flux.just("a", "b", "c", "d")
.log(null, Level.INFO, true) // line: 18
.flatMap(value ->
Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO, true) // line: 21
.take(3)
.log(null, Level.INFO, true) // line: 23
.subscribe(x ->
System.out.println("Thread: " + Thread.currentThread().getName() +
" , " + x), t -> {}, () -> System.out.println("Completed ""Only Once"));
这里棘手的部分是每个 Reactor 3 操作符都有自己的生命周期,它们都遵循相同的规则——发出onComplete
通知下游操作符不再有数据。
由于您有.log()
运算符和三个不同的点,因此您将观察到onComplete
来自.just
、来自.flatMap
和来自 的三个独立信号.take(3)
。
首先,你会看到onComplete
,.just
因为默认行为.flatMap
是“好的,让我们尝试请求第一个concurrency
元素,然后让我们看看它是如何进行的”,因为.just
可能(在你的情况下)只产生 4 个元素,在 2(这是并发级别)在您的示例中)请求的需求它将发出 2onNext
并且在两个之后request(1)
您将看到onComplete
. 反过来,发出的信号onComplete
让我们.flatMap
知道,当 4 个扁平流发出它们的.onComplete
信号时,它将被允许onComplete
向下游发出自己的信号。反过来,下游是.take(3)
操作符,它也在前三个元素之后发出自己的onComplete
信号,而无需等待上游onComplete
。由于.log
后面有运营商.take
该信号也将被记录。最后,在您的流程中,您有 3 个独立的日志运营商,这将记录 3 个独立onComplete
于 3 个独立运营商的运营商,但尽管如此,最终终端.subscribe
将仅收到onComplete
从第一个运营商到流程的一个。
.take
行为的小更新的中心思想.take
是获取元素直到满足剩余计数。由于上游可能产生比请求更多的数据,我们需要有一种机制来防止发送更多数据。Reactive-Streams 规范为我们提供的机制之一是通过Subscription
. 订阅有两种主要方法 - request
- 显示需求和cancel
- 即使请求的需求没有得到满足,也显示不再需要数据。对于.take
算子,初始需求为Long.MAX_VALUE
,视为无界需求。因此,停止消耗潜在的不定式数据流的唯一方法是取消订阅,或者换句话说取消订阅
希望对你有帮助:)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句