通过操作员分组,来自不同组的项目交错

叔叔

如下代码:

    Observable
            .just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
            .doOnNext(item -> System.out.println("source emitting " + item))
            .groupBy(item -> {
                System.out.println("groupBy called for " + item);
                return item % 3;
            })
            .subscribe(observable -> {
                System.out.println("got observable " + observable + " for key " + observable.getKey());
                observable.subscribe(item -> {
                    System.out.println("key " + observable.getKey() + ", item " + item);
                });
            });

让我感到困惑。我得到的输出是:

    source emitting 0
    groupBy called for 0
    got observable rx.observables.GroupedObservable@42110406 for key 0
    key 0, item 0
    source emitting 1
    groupBy called for 1
    got observable rx.observables.GroupedObservable@1698c449 for key 1
    key 1, item 1
    source emitting 2
    groupBy called for 2
    got observable rx.observables.GroupedObservable@5ef04b5 for key 2
    key 2, item 2
    source emitting 3
    groupBy called for 3
    key 0, item 3
    source emitting 4
    groupBy called for 4
    key 1, item 4
    source emitting 5
    groupBy called for 5
    key 2, item 5
    source emitting 6
    groupBy called for 6
    key 0, item 6
    source emitting 7
    groupBy called for 7
    key 1, item 7
    source emitting 8
    groupBy called for 8
    key 2, item 8
    source emitting 9
    groupBy called for 9
    key 0, item 9

因此,在顶层订阅方法中,正如预期的那样,我从GroupedObservable中获得了3个可观察值。然后,我一个接一个地订阅了分组的可观察对象-这里是我不了解的事情:

为什么原始项仍按原始顺序发出(即0、1、2、3,...),而不是键0的0、3、6、9 ...,而键1、4、7 1,接着是2、5、8(密钥2)?

我想我知道如何创建组:

1. 0 is emitted, the key function is called and it gets 0
2. it is checked if an observable for 0 exists, it doesn't, so a new one is created and emitted, and then it emits 0
3. the same happens for source items 1 and 2 as they both create new groups, and observables with key 1 and 2 are emitted, and they emit 1 and 2 correspondingly
4. source item 3 is emitted, the key function is called and it gets 0
5. it is checked if an observable for 0 exists, it does -> no new grouped observable is created nor emitted, but 3 is emitted by the already existing observable
6. etc. until the source sequence is drained

看来,尽管我逐一得到了分组的可观察物,但是它们的发射在某种程度上是交错的。这是怎么发生的?

亚当·S

为什么原始项仍按原始顺序发出(即0、1、2、3,...),而不是键0的0、3、6、9 ...,而键1、4、7 1,接着是2、5、8(密钥2)?

您已经回答了自己的问题。您正在按项目的发射顺序对其进行操作。因此,每发出一个,它就会沿操作员链向下传递,您将看到此处显示的输出。

您期望在那里的替代输出需要链条等待,直到源停止为所有发射物品为止说你有Observable.just(0, 1, 2, 3, 4, 4, 4, 4, 4, 4, 0)然后,您将期望(0,3,0),(1,4,4,4,4,4,4,4),(2)作为输出组。如果您有无限个4,该怎么办?您的订户永远不会从第一组收到那个0,3..。

您可以创建所需的行为。所述toList操作者将缓存输出,直到源完成,然后传递List<R>到用户:

.subscribe(observable -> {
    System.out.println("got observable " + observable + " for key " + observable.getKey());
    observable.toList().subscribe(items -> {
        // items is a List<Integer>
        System.out.println("key " + observable.getKey() + ", items " + items);
    });
});

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章