我想要什么的简短说明:
我有一组 observables,我希望所有的结果都来自同一个流。最初我希望第一个 observable 被激活,一旦一个 observable 收到它的第一个值,我希望下一个 observable 被激活。
我想要什么的详细解释:
我正在寻找的解决方案介于 RxJS 创建操作符“合并”和“连接”之间。
我有一系列可观察的。随着时间的推移,每个可观测值都会有几次排放。我想将这些 observable 排成一排,以便最初只激活第一个 observable (这类似于“concat”的工作方式)。
然后,一旦第一个 observable 收到它的第一个值,我就希望激活第二个 observable。一旦第二个 observable 收到它的第一个值,我希望激活第三个 observable,依此类推。(这与“concat”不同。“concat”等待前一个observable完成,但在我的用例中,我想等待前一个observable收到它的第一个值)
将只有一个结果流从所有激活的 observable 中发出值(这类似于“合并”的工作方式)。
我不认为有一个特定的 RxJS 运算符,但我希望可以通过混合多个运算符找到解决方案。
一旦第一个 observable 收到它的第一个值
Observable 不接收值,订阅者接收;这种区别在这里很重要,因为我们订阅每个 observable 的时间点很重要。例如,如果我们不从订阅者的角度考虑这一点,那么
merge(
a$,
a$.pipe(first(), switchMapTo(b$)),
b$.pipe(first(), switchMapTo(c$)),
)
将是一个解决方案 - 但我在这里的假设是这不是你的意思。不过,它确实让我们更接近解决方案。我们需要的是:
merge(
a$,
a$.pipe(first(), switchMapTo(b$)),
a$.pipe(first(), switchMapTo(b$.pipe(first(), switchMapTo(c$)))),
// …
)
当然,我们需要为此找到一个通用的解决方案。一种天真的方法就是“递归”:
const activatedMerge = (sources) => {
const activatedSources = sources.reduce((result, source, idx) => {
result.push(idx === 0 ? source : result[idx-1].pipe(first(), switchMapTo(source)));
return result;
}, []);
return merge(...activatedSources);
};
activatedMerge([a$, b$, c$]).subscribe(console.log);
很可能有比这更好的解决方案,不幸的是我没时间了。无论如何,希望这会有所帮助。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句