如何在RxJS5中施加定时背压?

雨果·塞雷诺·费雷拉

假设我有以下代码:

let a = Rx.Observable.of(1, 2, 3)
let b = Observable.zip(a, a, (a, b) => a + b)
b.forEach(t => console.log(t))

这将立即输出结果。现在,我如何在每条消息之间放置一个定时延迟作为背压(请注意,我不需要缓冲区;相反,我想要ab成为Cold Observables),例如:

b.takeEvery(1000).forEach(t => console.log(t))

并有完全相同的答案:

<wait 1s>
2
<wait 1s>
4
<wait 1s>
6

替代方法:如果RxJS不支持背压(某些可观察物的拉力机制),那么如何在资源不耗尽的情况下创建无限生成器?

备选方案2:同时支持拉入和推入机制的其他JS框架吗?

Artur Grzesiak

在RxJS 5.x的反压的情况下,不支持,但对于例如pausable操作员在4.x的版本。它仅适用于热观测。有关4.x此处(尤其是底部的战利品和RxJS相关说明)的背压的更多信息

此Erik Meijer的推文可能会引起争议,但相关:https : //twitter.com/headinthebox/status/774635475071934464

对于您自己的背压机制实现,您需要具有2路通信通道,该通道可以很容易地由2个主题创建-每个端点一个。基本上next用于发送消息和.subscribe列出到另一端。

创建生成器也是可行的-再次使用主题在基于推和拉的世界之间架起桥梁。下面是用于产生斐波那契数的示例性实施方式。

const fib = () => {
  const n = new Rx.Subject()
  const f = n
    .scan(c => ({ a: c.b, b: c.b + c.a }), { a: 0, b: 1 })
    .map(c => c.a)
    
  return {
    $: f,
    next: () => n.next()
  }
}

const f = fib()

f.$.subscribe(n => document.querySelector('#r').innerHTML = n)
Rx.Observable.fromEvent(document.querySelector('#f'), 'click')
  .do(f.next)
  .subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

<button id='f'>NEXT FIBONACCI</button>

<div id='r'>_?_<div>

您可能会感兴趣的另一个js库是https://github.com/ubolonton/js-csp-未使用它,因此不确定该如何处理背压。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章