RxJS的新手,但是我试图将一个元素流映射到另一个元素流,该元素流在所有内部/后续流完成/加载后生成一个数组。但是,我内在的可观察性似乎并没有执行。他们只是回到寒冷。
高层,我需要执行http post来上传文件列表(以两个不同的数组到两个不同的端点)。因为它们很大,所以我模拟了5秒的延迟。这些请求需要并行执行,但仅限于一次同时执行X(此处为2)。所有这些都必须在管道内部,并且管道应仅在所有帖子完成后才允许流继续进行。
https://stackblitz.com/edit/rxjs-pnwa1b
import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
map(claim =>
merge(
from(first).pipe(map(photo => of(photo).pipe(delay(5000)))),
from(second).pipe(map(video => of(video).pipe(delay(5000))))
)
.pipe(
mergeAll(2)
)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(toArray())
.pipe(tap(val => console.log(`emit:${val}`)))
)
)
.pipe(
catchError(error => {
console.log("error");
return Observable.throw(error);
})
)
.subscribe(val => console.log(`final:${val}`));
内部订阅不会等到它们完成。使用forkJoin不允许我限制并发上传。我该怎么做?
更新:
@dmcgrandle的回答非常有帮助,使我进行了以下似乎有效的更改:
import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable, throwError } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
mergeMap(claim =>
merge(
from(first).pipe(map(photo => of(photo).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`))))),
from(second).pipe(map(video => of(video).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`)))))
)
),
mergeAll(2),
toArray()
)
.pipe(
catchError(error => {
console.log("error");
return throwError(error);
})
)
.subscribe(val => console.log(`final:${val}`));
如果我正确地理解了您,那么我认为这是一个解决方案。您的问题是第一个map
,它不会执行内部订阅,而只是将流转换为Observables of Observables,这似乎并不是您想要的。相反,我在mergeMap
那里使用。
里面from
是我用concatMap从强制每个排放first
,并second
以发生并等待一个在另一个之前开始完成。我还设置了postToEndpoint
返回Observables的函数,使其更接近实际代码的外观。
码:
import { mergeMap, concatMap, delay, tap, catchError, toArray } from 'rxjs/operators';
import { merge, of, from, concat, throwError } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
const postToEndpoint1$ = photo => of(photo).pipe(
tap(data => console.log('start of postTo1 for photo:', photo)),
delay(5000),
tap(data => console.log('end of postTo1 for photo:', photo))
);
const postToEndpoint2$ = video => of(video).pipe(
tap(data => console.log('start of postTo2 for video:', video)),
delay(5000),
tap(data => console.log('end of postTo2 for video:', video))
);
of(single).pipe(
tap(val => console.log(`initial emit:${val}`)),
mergeMap(claim =>
merge(
from(first).pipe(concatMap(postToEndpoint1$)),
from(second).pipe(concatMap(postToEndpoint2$))
)
),
toArray(),
catchError(error => {
console.log("error");
return throwError(error);
})
).subscribe(val => console.log(`final:`, val));
我希望这有帮助。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句