如何使Subject的rxjs节流阀正常工作?

ToBeFrank

我拥有永不停止的数据流。需要对数据流的各个项目运行一种算法。该算法无法跟上数据流的速度,因此需要丢弃在处理算法时到达的数据项。我试图使用rxjsthrottle运算符来完成此操作,但是当我从Subject用作持续时间Selector的源发出时,油门不会重置但是,当我Subjectinterval操作员替换时,油门确实可以工作我猜我缺少一些简单的东西,但是我没有看到它。

import { Subject, interval, range } from 'rxjs';
import { throttle } from 'rxjs/operators';

function algorithm() {
  // simulate algorithm taking 1000ms
  return new Promise(resolve => setTimeout(resolve, 1000));
}
const dataStream = interval(200);
const algorithmDone = new Subject();
const pipeline = dataStream.pipe(throttle(sv => algorithmDone));
pipeline.subscribe(x => {
  console.log('here ' + x);
  algorithm();
  algorithmDone.next(0);
});

可运行的代码在这里:https : //stackblitz.com/edit/rxjs-tbs8qz

上面有一个数据流,每200ms产生一次数据。该算法需要1000毫秒,因此大约每五分之一dataStream应打印一次。通过上述实现,我只会得到第一个数字。如果我替换algorithmDonethrottle()interval(1000),我得到预期的输出。为什么这对我不起作用Subject

兰迪·卡斯本

algorithm()函数返回一个promise,但您正在使用它,就好像它是同步的一样。

更改:

pipeline.subscribe(x => {
  console.log('here ' + x);
  algorithm();
  algorithmDone.next(0);
});

要:

pipeline.subscribe(x => {
  console.log('here ' + x);
  algorithm().then(() => algorithmDone.next(0));
});

要么:

pipeline.subscribe(async x => {
  console.log('here ' + x);
  await algorithm();
  algorithmDone.next(0);
});

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章