RxJava-控制一个可被另一个观察到

丹尼斯·伊茨科维奇(Denis Itskovich)

更新

我正在寻找一种方法来控制一个可以被另一个观察到的流量。例如,让我们有2个单数递增(重要)的整数可观察值:

source  : 1----2-2---2--3--3--4----4--5---6----8---9---10--------11------
control : -1----3----------------5-----------6-------9-----------12------

我需要生成一个新的Observable,其元素与源完全匹配,但是它们的时序由控件Observable进行以下控制:源值应始终小于或等于控件值。这意味着只有所有大于最近发布的控件的源值都应等待,直到它们被控件“释放”为止

source         : 1----2-2---2--3--3--4----4--5---6----8---9---10--------11------
control        : -1----3----------------5-----------6-------9-----------12------
expected result: -1----2-2--2--3--3-----4-4--5------6-------8-9---------10-11---

请看下面的代码示例:

private static <T, C> Observable<T> combine(Observable<T> source, Observable<C> control, BiFunction<T, C, Boolean> predicate) {
    // ???
}

@Test
public void testControl() throws InterruptedException {
    Subject<Integer> control = PublishSubject.create();
    Observable<Integer> source = Observable.fromArray(1, 2, 2, 2, 3, 3, 4, 4, 5, 6, 8, 10, 11);
    Observable<Integer> combined = combine(source, control, (s, c) -> s <= c);
    control.subscribe(val -> System.out.println("Control: " + val));
    combined.observeOn(Schedulers.io()).subscribe(val -> System.out.println("Value: " + val));

    control.onNext(3); // should release 1,2,2,2,3,3
    Thread.sleep(1000);
    control.onNext(6); // should release 4,4,5,6
    Thread.sleep(1000);
    control.onNext(11); // should release 8,10,11
    Thread.sleep(1000);
}
丹尼斯·伊茨科维奇(Denis Itskovich)

由于找不到合适的解决方案,因此我最终自己实现了它。如果有人可以提出更优雅的解决方案,我会很高兴(在这种情况下,我将不接受此答案,并且会接受更好的答案)。以下是我的解决方案:

private static <T, C> Observable<T> combine(Observable<T> source, Observable<C> control, BiFunction<T, C, Boolean> predicate) {
    return Observable.create(emitter -> {
        Queue<T> buffer = new ArrayDeque<>();
        AtomicReference<C> lastControl = new AtomicReference<>();
        CompletableSubject sourceCompletable = CompletableSubject.create();
        CompletableSubject controlCompletable = CompletableSubject.create();
        Disposable disposable = new CompositeDisposable(
                control.subscribe(
                        val -> {
                            lastControl.set(val);
                            synchronized (buffer) {
                                while (!buffer.isEmpty() && predicate.apply(buffer.peek(), val)) {
                                    emitter.onNext(buffer.poll());
                                }
                            }
                        },
                        emitter::onError,
                        controlCompletable::onComplete),
                source.subscribe(
                        val -> {
                            C lastControlVal = lastControl.get();
                            synchronized (buffer) {
                                if (lastControlVal != null && predicate.apply(val, lastControlVal)) {
                                    emitter.onNext(val);
                                } else {
                                    buffer.add(val);
                                }
                            }
                        },
                        emitter::onError,
                        sourceCompletable::onComplete),
                controlCompletable.andThen(sourceCompletable).subscribe(emitter::onComplete));
        emitter.setDisposable(disposable);
    });
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

RxJava:等待另一个可观察到的结果

rxjava按条件收集对象并等待另一个可观察到的对象

RxJava可观察到的最后一个状态

将可观察到的Zip列表转换为另一个可观察到的RxJava2

RxJava:取消订阅可从另一个异步生产中观察到的异步

Groovy HTTPBuilder + RxJava返回一个空对象/可观察到

暂停和恢复基础上RxJava 2.X一个布尔门可观察到的?

RxJava合并订阅者仅从第一个可观察到的结果中获取结果

RxJava + Kotlin-在创建另一个观察值时使用

RxJava:阻止某个可观察对象发出,直到发出另一个可观察对象的数据

如何使RxJava 2中的可观察对象发出另一个可观察对象发出的组合项?

RxJava-当另一个完成时启动一个Observable

如何使用RxJava使用另一个流的element属性作为条件来过滤一个可观察的流?

RxJava-每秒发出一个可观察的信号

前一个完成后如何使用一个可观察的rxjava?

RxJava 将一个列表转换为另一个列表时遇到问题

RxJava:我如何开始一个任务,当它完成时开始另一个任务?

检查一个特定事件,然后检查另一个特定事件,并使用RxJava发出成功

RxJava 2-在另一个Completable之后调用Completable

如何在 RxJava 中将 observable 包装在另一个 observable 中?

RxJava Thread.sleep在另一个线程中

RxJava:如何将对象列表转换为另一个对象列表

使用RxJava将列表转换为另一个列表

使用 rxJava 和改造多次调用另一个请求中的请求

如何使用RxJava从另一个流实现过滤器

RxJava flatMapIterable只需一个

使用 RxJava 创建一个 Observable

如何将数据从观察到的类传递到另一个类?

使用RxJava从2个可观察对象中获得一个结果