我正在尝试使用RxJSgroupBy
运算符,然后concatMap
根据一些键将记录收集到单独的组中。
我注意到,当concatMap
跟随groupBy
运算符时,似乎会丢失第一个键之后出现的所有键的数据。
例如:
考虑以下代码块:
// DOES NOT WORK
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();
const result = clicks.pipe(
groupBy(x => x.substr(2,1)),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next(x));
// Expected Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
//
// Actual Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// ...Nothing more -- no results for key 2 and 3
但是,当我单独使用concatMap
运算符时,它的行为符合预期。
// WORKS
const records = ['a', 'b', 'c', 'd', 'e', 'f', 'g'];
const clicks = new Subject();
const result = clicks.pipe(
concatMap(ev => ev.subject$.pipe(take(4), map(x => ev.key + x))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next({key: x, subject$: interval(1000)}));
// Expected & Actual Output:
// a0
// a1
// a2
// a3
// b0
// b1
// b2
// b3
// c0
// c1
// c2
// c3
// d0
// d1
// d2
// d3
// e0
// e1
// e2
// e3
// f0
// f1
// f2
// f3
// g0
// g1
// g2
// g3
通过对RxJS的文档阅读groupBy
并concatMap
没有为我提供任何线索,什么可以怎么回事。而在RxJS部分concatMap
在reactivex.io使我相信,这应该工作。
有人可以帮助我了解这里第一种情况的情况吗?如何使第一个方案起作用?
我似乎终于弄清楚了这里的问题。
在上述问题的场景1中,代码首先将源流输送到groupBy
运算符中,然后再将其传输到运算concatMap
符中。而且,这些运算符的组合似乎导致了此问题。
groupBy
和mergeMap
通读操作员的代码groupBy
,我意识到在groupBy
内部Subject
为在源流中找到的每个键创建一个新实例。然后,该Subject
实例立即发出属于该键的所有值。
所有Subject
实例都包装到GroupedObservale
s中,并由groupBy
操作员向下游发出。GroupedObservable
实例流是concatMap
操作员的输入。
在concatMap
操作者在内部调用mergeMap
与值为1操作者concurrency
,这意味着只有一个源可观察到的订阅了兼任。
该mergeMap
运营商订购了只有一个可观察的,或尽可能多的观测作为被允许的conccurency
参数,并持有“缓冲”,直到第一个已完成了所有其他的观测。
首先,由于我已经阅读了这些运算符的代码,所以我不太确定这是否是一个“问题”。
尽管如此,我在问题中描述的行为仍然会发生,因为当groupBy
运算符Subject
立即使用相应实例发出各个值时,该mergeMap
运算符将不会订阅该特定值Subject
。因此,使用该源流发射的所有值都Subject
将丢失。
这不是这些运算符的工作方式的“问题”,而是我理解这些运算符的方式以及文档(尤其是文档,concatMap
对于刚接触RxJS的人员而言可能有些困惑)的方式。
通过使groupBy
操作员使用aReplaySubject
而不是Subject
发出分组的值,可以轻松解决此问题。在groupBy
接受一个subjectSelector
参数,使我们能够切换Subject
与实例ReplaySubject
的实例。
以下代码有效:
// THIS VERSION WORKS
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();
const result = clicks.pipe(
groupBy(x => x.substr(2,1), null, null, () => new ReplaySubject()),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next(x));
// We also need to explicity complete() the source
// stream to ensure that the observable stream for
// the first GroupedObservable completes allowing
// the concatMap operator to move to the second
// GroupedObservable.
clicks.complete();
// Expected and Actual output
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
在我的问题中,方案2可以正常工作,因为它interval
只是创建了一个Observable,但没有开始发出值。因此,当mergeMap
最终订阅该Observable时,该Observable的所有值均可用。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句