延迟发射物品,直到从另一个可观察到的物品发射出来

帕维尔·杜德卡(Pavel Dudka)

现在玩RxJava,偶然发现以下问题:

我有2个不同的流:

  • 与项目流
  • 流(只有一项),它为第一个流发出转换信息。

所以本质上我有项目流,并且我希望所有这些项目都与第二个流中的单个项目合并:

---- A1 ---- A2 ---- A3 ---- A4 ---- A5 ---- | --------------->

------------- b1-- | ------------------------------------------------- ->

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

------------ a1b1-a2b1-a3b1-a4b1-a5b1 -------->

它看起来确实类似于combileLatestoperator,但是combinateLatest将忽略第一个流中的所有项目,除了最接近第二个流中的项目。这意味着我不会收到a1b1-发出的第一个结果项目将是a2b1

我又看了看delay运营商,但它不允许我指定close像它与做流buffer算子

有没有花哨的运算符可以解决上述问题?

弗拉基米尔·米罗诺夫(Vladimir Mironov)

AFAIK,没有内置的运算符可实现您描述的行为。您始终可以实现自定义运算符,也可以在现有运算符之上构建它。我认为第二个选项更容易实现,这是代码:

public static <L, R, T> Observable<T> zipper(final Observable<? extends L> left, final Observable<? extends R> right, final Func2<? super L, ? super R, ? extends T> function) {
    return Observable.defer(new Func0<Observable<T>>() {
        @Override
        public Observable<T> call() {
            final SerialSubscription subscription = new SerialSubscription();
            final ConnectableObservable<? extends R> cached = right.replay();

            return left.flatMap(new Func1<L, Observable<T>>() {
                @Override
                public Observable<T> call(final L valueLeft) {
                    return cached.map(new Func1<R, T>() {
                        @Override
                        public T call(final R valueRight) {
                            return function.call(valueLeft, valueRight);
                        }
                    });
                }
            }).doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    subscription.set(cached.connect());
                }
            }).doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    subscription.unsubscribe();
                }
            });
        }
    });
}

如果您对代码有任何疑问,我可以详细解释。

更新

关于询问我的解决方案与以下解决方案有何不同的问题:

left.flatMap(valueLeft -> right.map(valueRight -> together(valueLeft, valueRight)));
  1. 并行执行-在我的实现中leftright可观察变量可观察变量都是并行执行的。right可观察的不必等待left一个发出其第一项。
  2. 缓存-我的解决方案仅订阅一次right可观察对象并缓存其结果。那就是为什么b1所有aXXX项目总是一样的原因akarnokd提供的解决方案right每次left发射一个项目都会订阅可观察的项目。这意味着:

    • 无法保证b1不会改变其价值。例如,对于以下可观察到的对象,您将为b每个对象得到不同的结果a

      final Observable<Double> right = Observable.defer(new Func0<Observable<Double>>() {
          @Override
          public Observable<Double> call() {
              return Observable.just(Math.random());
          }
      });
      
    • 如果right可观察对象是一项耗时的操作(例如,网络呼叫),则每次left可观察对象发出新项目时,您都必须等待其完成

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

根据另一个可观察值的值延迟发射可观察值

RxJs:使用发射另一个可观察值来过滤可观察内容

接收可观察物发射的物品

可观察到的发射值不会延迟

Flux.using 不发射物品

仅在最近两秒内未发射另一个可观察物时才从可观察物发射

如何使一个可观察序列在发射之前先等待另一个完成?

RxJava:等待另一个可观察到的结果

将可观察到的嵌套对象映射到另一个对象

当另一个发出值时,可观察到RxJS错误

根据另一个对Rx可观察到的Rx进行升采样

* ng如果异步管道与另一个可观察到的角度合并

rxjava按条件收集对象并等待另一个可观察到的对象

将可观察到的Zip列表转换为另一个可观察到的RxJava2

识别RxJava中可观察到的发射

DOM悬停时有规律的可观察到的发射

秋田可观察到完成不发射

将一个可观察到的称为另一个是正确的吗?

将MergeMap与从另一个可观察到的数据数组一起使用-RxJs Angular

RXJS首先发出结果,然后等待另一个可观察到的结果

RxSwift:将可观察的延迟到另一个可观察的完成之前?

如何聆听另一个物体的事件发射?

为什么没有背压支持的Observable收到请求通知后仍继续发射物品?

将RxSwift可观察结果绑定到另一个可观察结果

RxJava可观察到的最后一个状态

如何创建一个在2个事件之间跳过发射的可观察对象

漂浮在另一个上的物品

RxJava-控制一个可被另一个观察到

包装一个可观察的。在每个发射值之前和之后做点什么