在RxJS中,地图无法执行可观察到的内部地图

普通用户

RxJS的新手,但是我试图将一个元素流映射到另一个元素流,该元素流所有内部/后续流完成/加载生成一个数组但是,我内在的可观察性似乎并没有执行。他们只是回到寒冷。

高层,我需要执行http post来上传文件列表(以两个不同的数组到两个不同的端点)。因为它们很大,所以我模拟了5秒的延迟。这些请求需要并行执行,但仅限于一次同时执行X(此处为2)。所有这些都必须在管道内部,并且管道应仅所有帖子完成后才允许流继续进行

https://stackblitz.com/edit/rxjs-pnwa1b

import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable } from 'rxjs';

const single = "name";

const first = ["abc", "def"];

const second = of("ghi", "jkl", "mno");

of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
  map(claim => 
    merge(
      from(first).pipe(map(photo => of(photo).pipe(delay(5000)))),
      from(second).pipe(map(video => of(video).pipe(delay(5000))))
    )
    .pipe(
      mergeAll(2)
    )
    .pipe(tap(val => console.log(`emit:${val}`)))
    .pipe(toArray())
    .pipe(tap(val => console.log(`emit:${val}`)))
  )
)
.pipe(
catchError(error => {
  console.log("error");
  return Observable.throw(error);
})
)
.subscribe(val => console.log(`final:${val}`));

内部订阅不会等到它们完成。使用forkJoin不允许我限制并发上传。我该怎么做?

更新:

@dmcgrandle的回答非常有帮助,使我进行了以下似乎有效的更改:

import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable, throwError } from 'rxjs';

const single = "name";

const first = ["abc", "def"];

const second = of("ghi", "jkl", "mno");

of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
  mergeMap(claim =>
    merge(
      from(first).pipe(map(photo => of(photo).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`))))),
      from(second).pipe(map(video => of(video).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`)))))
    )
  ),
  mergeAll(2),
  toArray()
)
.pipe(
catchError(error => {
  console.log("error");
  return throwError(error);
})
)
.subscribe(val => console.log(`final:${val}`));
dmcgrandle

如果我正确地理解了您,那么我认为这是一个解决方案。您的问题是第一个map,它不会执行内部订阅,而只是将流转换为Observables of Observables,这似乎并不是您想要的。相反,我在mergeMap那里使用

里面from是我用concatMap从强制每个排放first,并second以发生并等待一个在另一个之前开始完成。我还设置了postToEndpoint返回Observables的函数,使其更接近实际代码的外观。

StackBlitz演示

码:

import { mergeMap, concatMap, delay, tap, catchError, toArray } from 'rxjs/operators';
import { merge, of, from, concat, throwError } from 'rxjs';

const single = "name";

const first = ["abc", "def"];

const second = of("ghi", "jkl", "mno");

const postToEndpoint1$ = photo => of(photo).pipe(
  tap(data => console.log('start of postTo1 for photo:', photo)),
  delay(5000),
  tap(data => console.log('end of postTo1 for photo:', photo))
);

const postToEndpoint2$ = video => of(video).pipe(
  tap(data => console.log('start of postTo2 for video:', video)),
  delay(5000),
  tap(data => console.log('end of postTo2 for video:', video))
);

of(single).pipe(
  tap(val => console.log(`initial emit:${val}`)),
  mergeMap(claim => 
    merge(
      from(first).pipe(concatMap(postToEndpoint1$)),
      from(second).pipe(concatMap(postToEndpoint2$))
      )
  ),
  toArray(),
  catchError(error => {
    console.log("error");
    return throwError(error);
  })
).subscribe(val => console.log(`final:`, val));

我希望这有帮助。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章