RxJS:可观察到的流通过管道传输到groupBy(),再通过concatMap(); 后续密钥丢失的数据

基兰

我正在尝试使用RxJSgroupBy运算符,然后concatMap根据一些键将记录收集到单独的组中。

我注意到,当concatMap跟随groupBy运算符时,似乎会丢失第一个键之后出现的所有键的数据。

例如:

考虑以下代码块:

// DOES NOT WORK
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();

const result = clicks.pipe(
  groupBy(x => x.substr(2,1)),
  concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next(x));

// Expected Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
// 
// Actual Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// ...Nothing more -- no results for key 2 and 3

但是,当我单独使用concatMap运算符时,它的行为符合预期。

// WORKS
const records = ['a', 'b', 'c', 'd', 'e', 'f', 'g'];
const clicks = new Subject();

const result = clicks.pipe(
  concatMap(ev => ev.subject$.pipe(take(4), map(x => ev.key + x))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next({key: x, subject$: interval(1000)}));

// Expected & Actual Output:
// a0
// a1
// a2
// a3
// b0
// b1
// b2
// b3
// c0
// c1
// c2
// c3
// d0
// d1
// d2
// d3
// e0
// e1
// e2
// e3
// f0
// f1
// f2
// f3
// g0
// g1
// g2
// g3

通过对RxJS的文档阅读groupByconcatMap没有为我提供任何线索,什么可以怎么回事。而在RxJS部分concatMapreactivex.io使我相信,这应该工作。

有人可以帮助我了解这里第一种情况的情况吗?如何使第一个方案起作用?

基兰

我似乎终于弄清楚了这里的问题。

在上述问题的场景1中,代码首先将源流输送到groupBy运算符中,然后再将其传输到运算concatMap符中。而且,这些运算符的组合似乎导致了此问题。

的内部运作groupBymergeMap

通读操作员的代码groupBy,我意识到在groupBy内部Subject为在源流中找到的每个键创建一个新实例。然后,该Subject实例立即发出属于该键的所有值

所有Subject实例都包装到GroupedObservales中,并由groupBy操作员向下游发出GroupedObservable实例concatMap操作员的输入

concatMap操作者在内部调用mergeMap与值为1操作者concurrency,这意味着只有一个源可观察到的订阅了兼任。

mergeMap运营商订购了只有一个可观察的,或尽可能多的观测作为被允许的conccurency参数,并持有“缓冲”,直到第一个已完成了所有其他的观测。

这如何造成问题?

首先,由于我已经阅读了这些运算符的代码,所以我不太确定这是否是一个“问题”。

尽管如此,我在问题中描述的行为仍然会发生,因为当groupBy运算符Subject立即使用相应实例发出各个值时,该mergeMap运算符将不会订阅该特定值Subject因此,使用该源流发射的所有值都Subject将丢失。

我试图用一个粗糙的大理石图来说明这个问题: groupBy to concatMap问题

这不是这些运算符的工作方式的“问题”,而是我理解这些运算符的方式以及文档(尤其是文档,concatMap对于刚接触RxJS的人员而言可能有些困惑)的方式。

通过使groupBy操作员使用aReplaySubject而不是Subject发出分组的值,可以轻松解决此问题groupBy接受一个subjectSelector参数,使我们能够切换Subject与实例ReplaySubject的实例。

以下代码有效:

// THIS VERSION WORKS
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();

const result = clicks.pipe(
  groupBy(x => x.substr(2,1), null, null, () => new ReplaySubject()),
  concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next(x));

// We also need to explicity complete() the source
// stream to ensure that the observable stream for
// the first GroupedObservable completes allowing
// the concatMap operator to move to the second
// GroupedObservable.
clicks.complete();

// Expected and Actual output
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }

方案2为什么起作用?

在我的问题中,方案2可以正常工作,因为它interval只是创建了一个Observable,但没有开始发出值。因此,当mergeMap最终订阅该Observable时,该Observable的所有值均可用

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章