关于 Reactor 信号的基本问题

阿尔菲的态度

我对以下代码的输出有一些疑问:

Flux.just("a", "b", "c", "d")
        .log(null, Level.INFO, true) // line: 18
        .flatMap(value ->
                Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
        .log(null, Level.INFO, true) // line: 21
        .take(3)
        .log(null, Level.INFO, true) // line: 23
        .subscribe(x -> 
             System.out.println("Thread: " + Thread.currentThread().getName() +
                               " , " + x));

Thread.sleep(1000 * 1000);

输出:

1. 11:29:11 [main] INFO  - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)    Flux.log(App.java:18)
2. 11:29:11 [main] INFO  - onSubscribe(FluxFlatMap.FlatMapMain)     Flux.log(App.java:21)
3. 11:29:11 [main] INFO  - onSubscribe(FluxTake.TakeSubscriber)     Flux.log(App.java:23)
4. 11:29:11 [main] INFO  - request(unbounded)   Flux.log(App.java:23)
5. 11:29:11 [main] INFO  - request(unbounded)   Flux.log(App.java:21)
6. 11:29:11 [main] INFO  - | request(2)     Flux.log(App.java:18)
7. 11:29:11 [main] INFO  - | onNext(a)  Flux.log(App.java:18)
8. 11:29:11 [main] INFO  - | onNext(b)  Flux.log(App.java:18)
9. 11:29:11 [elastic-2] INFO  - onNext(A)   Flux.log(App.java:21)
10. 11:29:11 [elastic-2] INFO  - onNext(A)  Flux.log(App.java:23)
11. Thread: elastic-2 , A
12. 11:29:11 [elastic-2] INFO  - | request(1)   Flux.log(App.java:18)
13. 11:29:11 [main] INFO  - | onNext(c)     Flux.log(App.java:18)
14. 11:29:11 [elastic-3] INFO  - onNext(B)  Flux.log(App.java:21)
15. 11:29:11 [elastic-3] INFO  - onNext(B)  Flux.log(App.java:23)
16. Thread: elastic-3 , B
17. 11:29:11 [elastic-3] INFO  - | request(1)   Flux.log(App.java:18)
18. 11:29:11 [elastic-3] INFO  - | onNext(d)    Flux.log(App.java:18)
19. 11:29:11 [elastic-3] INFO  - | onComplete()     Flux.log(App.java:18)
20. 11:29:11 [elastic-3] INFO  - onNext(C)  Flux.log(App.java:21)
21. 11:29:11 [elastic-3] INFO  - onNext(C)  Flux.log(App.java:23)
22. Thread: elastic-3 , C
23. 11:29:11 [elastic-3] INFO  - cancel()   Flux.log(App.java:21)
24. 11:29:11 [elastic-3] INFO  - onComplete()   Flux.log(App.java:23)
25. 11:29:11 [elastic-3] INFO  - | cancel()     Flux.log(App.java:18)

问题:每个问题都与输出中的特定行有关(而不是代码中的一行)。我还为其中一些添加了我的答案,但我不确定我是否正确。

  1. 订阅时,订阅操作会询问unbounded元素数量。那么为什么事件:request(unbounded)在管道中下降而不是上升?我的回答:unbounded金额请求上升take,然后take再次发送。

  2. flatMap发送cancel信号。为什么不take发送呢?

最后一个问题:输出中有多个终端信号。这不是对反应流规范的评估吗?

多库卡

在那种情况下,将只产生一个终端信号。

Flux.just("a", "b", "c", "d")
            .log(null, Level.INFO, true) // line: 18
            .flatMap(value ->
                    Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic()), 2)
            .log(null, Level.INFO, true) // line: 21
            .take(3)
            .log(null, Level.INFO, true) // line: 23
            .subscribe(x ->
                    System.out.println("Thread: " + Thread.currentThread().getName() +
 " , " + x), t -> {}, () -> System.out.println("Completed ""Only Once"));

这里棘手的部分是每个 Reactor 3 操作符都有自己的生命周期,它们都遵循相同的规则——发出onComplete通知下游操作符不再有数据。

由于您有.log()运算符和三个不同的点,因此您将观察到onComplete来自.just、来自.flatMap和来自 的三个独立信号.take(3)

首先,你会看到onComplete.just因为默认行为.flatMap是“好的,让我们尝试请求第一个concurrency元素,然后让我们看看它是如何进行的”,因为.just可能(在你的情况下)只产生 4 个元素,在 2(这是并发级别)在您的示例中)请求的需求它将发出 2onNext并且在两个之后request(1)您将看到onComplete. 反过来,发出的信号onComplete让我们.flatMap知道,当 4 个扁平流发出它们的.onComplete信号时,它将被允许onComplete向下游发出自己的信号反过来,下游是.take(3)操作符,它也在前三个元素之后发出自己的onComplete信号,而无需等待上游onComplete由于.log后面运营商.take该信号也将被记录。最后,在您的流程中,您有 3 个独立的日志运营商,这将记录 3 个独立onComplete于 3 个独立运营商的运营商,但尽管如此,最终终端.subscribe将仅收到onComplete从第一个运营商到流程的一个。

关于.take行为的小更新

的中心思想.take是获取元素直到满足剩余计数。由于上游可能产生比请求更多的数据,我们需要有一种机制来防止发送更多数据。Reactive-Streams 规范为我们提供的机制之一是通过Subscription. 订阅有两种主要方法 - request- 显示需求和cancel- 即使请求的需求没有得到满足,也显示不再需要数据。对于.take算子,初始需求Long.MAX_VALUE,视为无界需求。因此,停止消耗潜在的不定式数据流的唯一方法是取消订阅,或者换句话说取消订阅

希望对你有帮助:)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

关于 Docker 的基本问题

关于 Verilog 的基本问题

关于R水管工的基本问题

关于 git checkout -b 的基本问题

关于持续集成的基本问题

关于“堆栈”基本问题的问题(C++)

matplotlib 中关于 fig 和 axes 的基本问题

OOP 关于组合、继承和多态的基本问题

关于Crossfilter上的航空公司示例的基本问题

关于 Zendesk 支持的一些基本问题

关于 $_GET 与 AJAX 和 PHP 的基本问题

关于正则表达式提取的基本问题

关于链表和节点插入的基本问题

JS中关于OOP的一些基本问题

SQL Server,关于“无效的列名”的基本问题

关于此 C 代码如何工作的基本问题

关于 Make 的一些基本问题:用数字编制报告

关于整数变量和字符串连接的最基本问题,在C ++中使用“ +”

关于在用户和计算机之间复制文件的真正基本问题

关于多个顶点类型和多个着色器的基本问题

关于处理器和操作系统的一些基本问题?

python中关于or运算符的一个基本问题

关于向对象添加另一个对象/项目并且新属性是数组的 Tyepscript 基本问题

关于rxjs可观察对象的基本问题-如何将数据手动传递给可观察对象?

关于调色板,图形透明度和x轴密度的一些Matplotlib基本问题

关于PPA的非常基本的问题

嗨,我希望有人可以帮助我解决这个基本问题。这是关于将向量分组为对和与 R 成对的 rnorm 向量

SQL基本问题

Java基本问题