来自可观察数组的MergeMap

约翰·蒂格诺特

TLDR:工作示例在该问题的最后一个代码块中。查看@ bryan60答案,使用concat而不是的工作示例mergeMap


我正在尝试顺序运行多个远程请求,但仅执行第一个可观察到的请求。

请求的数量各不相同,因此我无法在将可观察对象彼此嵌套的情况下进行躲避的解决方案。

我正在使用以下代码:

const observables = [
  observable1,
  observable2,
  ...
];

from(observables).pipe(
  mergeMap(ob=> {
    return ob.pipe(map(res => res));
  }, undefined, 1)
).subscribe(res => {
  console.log('Huzzah!');
})

过去(rxjs 5.5)Ive使用以下方法:

let o = Observable.from(observables).mergeMap((ob) => {
  return ob;
}, null, 1);

o.subscribe(res => {
  console.log('Huzzah!');
})

我不确定自己在做什么错,有人可以阐明吗?

另一个要求是仅打印“ Huzzah!” 一旦完成所有请求,而不是每个可观察者。

编辑:

undefined从我的原始代码中删除将使其起作用,但是还有另一个问题导致仅第一个可观察对象被执行。

我正在使用Angular的HttpClient远程请求。我的可观察代码如下所示:

const observables = [];

// Only the first observable would be executed
observables.push(this.http.get(urla));
observables.push(this.http.get(urlb));
observables.push(this.http.get(urlc));

.pipe(take(1))每个可观察值添加结果将执行每个可观察值:

const observables = [];

// All observables will now be executed
observables.push(this.http.get(urla).pipe(take(1));
observables.push(this.http.get(urlb).pipe(take(1));
observables.push(this.http.get(urlc).pipe(take(1));

我最终使用的代码按顺序执行所有可观察对象,并且仅触发Huzzah!一次:

const observables = [];

observables.push(this.http.get(urla).pipe(take(1));
observables.push(this.http.get(urlb).pipe(take(1));
observables.push(this.http.get(urlc).pipe(take(1));

from(observables).pipe(
  mergeMap(ob=> {
    return ob.pipe(map(res => res));
  }, 1),
  reduce((all: any, res: any) => all.concat(res), [])
).subscribe(res => {
  console.log('Huzzah!');
})

感谢@ bryan60帮助我解决了这个问题。

布莱恩60

如果这些是完成的http请求,则我认为您的错误是由mergeMap对删除结果选择器签名的更改引起的很难确定是否确切知道您所使用的版本,然后将其删除,然后再次添加,然后在v7中将它们永久删除。

如果您想按顺序运行它们...这就是您所需要的...

// concat runs input observables sequentially
concat(...observables).subscribe(res => console.log(res))

如果您要等到它们全部发射完,请执行以下操作:

concat(...observables).pipe(
  // this will gather all responses and emit them all when they're done
  reduce((all, res) => all.concat([res]), [])
  // if you don't care about the responses, just use last()
).subscribe(allRes => console.log(allRes))

在我的个人实用程序rxjs lib中,我总是包括一个concatJoin将concat和reduce结合在一起运算符。

唯一的技巧是concat需要可观察对象完成才能继续进行下一个操作,但是对于mergeMap将并发订阅设置为1.的情况也是如此,因此应该没事。之类的http请求都很好,因为他们经过一个发射..的WebSockets或主题或事件的发射器自然完成的行为有点不同,必须手动完成,要么像运营商firsttake或在源头。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章