多播粘性可观察到具有on subscription / on处置行为

凯文·克鲁姆维德

我正在尝试创建Observable具有以下特征的:

  1. 允许多个并发和/或连续的订户
  2. 向每个新订户发出最后发出的项目
  3. 当第一个订阅者订阅时,以及最后一个订阅被撤消时,它会执行某些操作

BehaviorSubject带有doOnSubscribe/的AdoOnDispose满足#1和#2,但为每个订户(而不是第一个和最后一个)运行订阅/处置。添加share满足#1和#3,但仅将最后发出的项目发射给第一个并发订户。

我想出了一个似乎可行的解决方案,但感觉就像一个丑陋的hack:

AtomicInteger subs = new AtomicInteger();
Observable<String> test = BehaviorSubject.createDefault("foo")
        .doOnSubscribe(x -> {
            if(subs.getAndIncrement() == 0) {
                // do something
            }
        })
        .doOnDispose(() -> {
            if(subs.decrementAndGet() == 0) {
                // do something
            }
        });

是否存在可以达到相同效果的现有运营商或运营商组合?

约翰·沃斯

replay参数1使用运算符,即

yourObservable.replay(1)

编辑:您是对的,重播将返回一个connectedObservable,并且refcount运算符将使其像在Observable上那样运行,即

yourObservable.replay(1).refcount()

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

带有异步函数和toPromise的可观察到的RxJS异常行为是什么?

UB上下文中的“可观察到的行为”“未定义行为”

rxjs / Subscription没有导出的成员“ Subscription”

角度:使用管道和选项卡时,可观察到的更改行为

Aurelia使可绑定行为在对象属性上可观察到

返回的行为主题(可观察到)是受take(1)还是取消订阅影响?

可观察到的:如果在map()之前加注,为什么结果不会出现在subscription()成功函数中?

具有可观察/行为对象的Firebase Promise错误

继承中观察到的奇怪行为

SwitchMap具有可观察到的延迟

编译器降低程序的时间复杂度是否合法?是否认为这是可观察到的行为?

处置先前可观察的selectmany rx

block(),subscription()和subscription(-)有什么区别

可以观察到带有剔除的数字动画行为吗?

为什么我观察到回声的不同行为?

在python中遍历列表时观察到奇怪的行为

为什么订阅相同序列的不同可观察对象时,ReplySubject具有不同的行为?

可观察到可观察的顺序

当基础数据源具有新值时,了解可观察到的RxJava

具有bootstrap事件的Angular2不会触发可观察到的更改

rxjs / Subscription没有导出的成员“ ISubscription”

可观察到的异步

刷新可观察到

关闭多播可观察

我是否应该始终处置可观察变量的订阅?

处置时停止rxJava可观察的链执行

在设定的时间后如何处置可观察的物体?

可观察的Rx Scala行为

管道RxJS可观察到的现有主题