当前一个流接收第一个值时 RxJS 合并

打鼾丹

我想要什么的简短说明:

我有一组 observables,我希望所有的结果都来自同一个流。最初我希望第一个 observable 被激活,一旦一个 observable 收到它的第一个值,我希望下一个 observable 被激活。

我想要什么的详细解释:

我正在寻找的解决方案介于 RxJS 创建操作符“合并”和“连接”之间。

我有一系列可观察的。随着时间的推移,每个可观测值都会有几次排放。我想将这些 observable 排成一排,以便最初只激活第一个 observable (这类似于“concat”的工作方式)

然后,一旦第一个 observable 收到它的第一个值,我就希望激活第二个 observable。一旦第二个 observable 收到它的第一个值,我希望激活第三个 observable,依此类推。(这与“concat”不同。“concat”等待前一个observable完成,但在我的用例中,我想等待前一个observable收到它的第一个值)

将只有一个结果流从所有激活的 observable 中发出值(这类似于“合并”的工作方式)

我不认为有一个特定的 RxJS 运算符,但我希望可以通过混合多个运算符找到解决方案。

英戈·伯克

一旦第一个 observable 收到它的第一个值

Observable 不接收值,订阅者接收;这种区别在这里很重要,因为我们订阅每个 observable 的时间点很重要。例如,如果我们不从订阅者的角度考虑这一点,那么

merge(
  a$,
  a$.pipe(first(), switchMapTo(b$)),
  b$.pipe(first(), switchMapTo(c$)),
)

将是一个解决方案 - 但我在这里的假设是这不是你的意思。不过,它确实让我们更接近解决方案。我们需要的是:

merge(
  a$,
  a$.pipe(first(), switchMapTo(b$)),
  a$.pipe(first(), switchMapTo(b$.pipe(first(), switchMapTo(c$)))),
  // …
)

当然,我们需要为此找到一个通用的解决方案。一种天真的方法就是“递归”:

const activatedMerge = (sources) => {
  const activatedSources = sources.reduce((result, source, idx) => {
    result.push(idx === 0 ? source : result[idx-1].pipe(first(), switchMapTo(source)));
    return result;
  }, []);

  return merge(...activatedSources);
};

activatedMerge([a$, b$, c$]).subscribe(console.log);

很可能有比这更好的解决方案,不幸的是我没时间了。无论如何,希望这会有所帮助。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

合并到母版时,在压缩第一个分支后合并分支的分支

rxjs / Observable:获取第一个流后运行一次函数(可连续观察)

RxJS:如何将两个流合并为一个,以一个流的速率触发事件,但使用另一个流的最新值?

在RxJS中合并两个对象

回调为第一个参数时的RXJS 5 bindCallback

合并两个rxjs流,仅在第一个为true时发出

RxJs组成一个Observable列表,然后合并(Firebase)

Rxjs:将两个流与一个依赖于另一个的流合并

RXJS:合并流并利用最新流

将RXJS可观察的结果合并为一个

SQL合并列的第一个和最后一个值以创建到第三列的范围

如何使用Rxjs在一个列表中合并两个对象列表

rxjs:合并两个流,但是在第二个发送事件后取消第一个

如何调用多个URL,仅处理使用RxJS返回的第一个值?

如果第一个值为null,则在合并两个值时如何获得最后一个值?

从流中获取第一个定义的值并取消订阅RxJS的最佳方法是什么?

RXJS-将两个流结果合并到一个排序的数组中

使用合并从2个表中返回第一个非空值

RXJS:将来自foreach的多个调用合并为一个可观察的对象

在列表中合并多个Kotlin流,而无需等待第一个值

Angular RxJs可观察流合并数据

根据第一个元素合并列表

选择第一个合并值,而不是全部

在第一个变量之后用新行将变量的值合并为一个

根据第一个文件第一列的所有值合并2个文件

如何将数组的 Array Observavble 与 RXJS 合并到一个数组的 Observable 中?

如何在 RXJS retryWhen 中发出/合并另一个 observable

如何使用 RxJS 跳过 observable 的第一个值?

添加新元素时RXJS启动缓冲区并且它是第一个