RxJava合并订阅者仅从第一个可观察到的结果中获取结果

边缘的

我试图了解rxjava合并是如何工作的。所以这是简单的代码,应该合并2个可观察对象的结果并发送给订阅者

    Observable.merge(getObservable(), getTimedObservable())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override public void call(final String s) {
                        Log.i("test", s);
                    }
                });

    private Observable<String> getTimedObservable() {
        return Observable.interval(150, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, String>() {
                    @Override public String call(final Long aLong) {
                        Log.i("test", "tick thread: " + Thread.currentThread().getId());
                        return String.valueOf(aLong);
                    }
                });
    }

    public Observable<String> getObservable() {
        return  Observable.create(new Observable.OnSubscribe<String>() {
            @Override public void call(final Subscriber<? super String> subscriber) {
                try {
                    Log.i("test", "simple observable thread: " + Thread.currentThread().getId());
                    for (int i = 1; i <= 10; i++) {
                        subscriber.onNext(String.valueOf(i * 100));
                        Thread.sleep(300);
                    }
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        });
    }

我预计订户中的合并结果将是

100 0 1 200 2 300 4 5 400

或类似的东西,但是,实际结果是:

 test: simple observable thread: 257 
test: 100
test: 200
test: 300
test: 400
test: 500
test: 600
test: 700
test: 800
test: 900
test: 1000
test: tick thread: 254
test: 0
test: tick thread: 254
test: 1
test: tick thread: 254
test: 2
test: tick thread: 254
test: 3
test: tick thread: 254
test: 4
test: tick thread: 254
test: 5
test: tick thread: 254
test: 6
test: tick thread: 254
test: 7
test: tick thread: 254
test: 8
test: tick thread: 254
test: 9
test: tick thread: 254
test: 10
test: tick thread: 254
test: 11
test: tick thread: 254
test: 12
test: tick thread: 254
test: 13

在第一个可观察的块中看起来像Thread.sleep在第二个可观察的块中发出,但是我不知道如何。有人可以解释吗?

汉斯·沃斯特

合并将同时订阅两个可观察对象。将首先订阅的可观察对象将在调用线程上产生值。因为调用线程被observable1阻塞,所以observable2无法产生值。SubscribeOn将仅说明订阅将在何处发生。假设可观察的开始在main-1上产生值。每个值下游都将在同一线程上。没有并发发生。

如果要实现并发,则必须为每个可观察对象说出必须进行订阅的位置。所以可以说我们有两个可观察对象的Observables.merge。Observable1和Observable2具有某些线程池的subscriptionOn。每个可观察对象将在subscribeOn的给定线程上生成值。您已实现并发。

请在以下位置查看编辑后的输出:

@Test
public void name() throws Exception {
    Subscription subscribe = Observable.merge(getObservable(), getTimedObservable())
            //.observeOn(AndroidSchedulers.mainThread())
            .subscribe(s -> {

                System.out.println("subscription " + s);
                //Log.i("test", s);
            });


    Thread.sleep(5_000);
}

private Observable<String> getTimedObservable() {
    return Observable.interval(150, TimeUnit.MILLISECONDS)
            .map(aLong -> {
                System.out.println("getTimedObservable: " + Thread.currentThread().getId());

                //Log.i("test", "tick thread: " + Thread.currentThread().getId());
                return String.valueOf(aLong);
            }).subscribeOn(Schedulers.io());
}

private Observable<String> getObservable() {
    return Observable.<String>create(subscriber -> {
        try {
            for (int i = 1; i <= 10; i++) {
                System.out.println("getObservable: " + Thread.currentThread().getId());
                subscriber.onNext(String.valueOf(i * 100));
                Thread.sleep(300);
            }
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

RxJava在链式网络请求中获取先前可观察到的结果

RxJava可观察到的最后一个状态

将RxSwift可观察结果绑定到另一个可观察结果

使用RxJava从2个可观察对象中获得一个结果

RxJava2:第一个订阅者和最后一个未订阅观察者的PublishProcessor回调

获取RxJava2中可观察到的N个最后发出的对象

RxJava运算符,它合并两个可观察对象,并在第一个可观察对象发出后立即发出

将可观察的结果与其管道的结果合并为一个可观察的结果

如何合并多个可观察到的RxJS的结果?

* ng如果异步管道与另一个可观察到的角度合并

如何将第一个可观察结果转换为下一个可观察结果?

RXJS首先发出结果,然后等待另一个可观察到的结果

将可观察到的响应合并为一个

将一个可观察到的RxJS结果传递到下一个可观察到的序列中

Groovy HTTPBuilder + RxJava返回一个空对象/可观察到

将RXJS可观察的结果合并为一个

使用最后一个中的第一个结果依次运行3个可观察物

可观察到的第一个元素的特殊处理

rxjs:结合可观察到的结果,同时已经使用异步管道显示了第一个

使用第一个观察结果订阅其他案例

从组件订阅一个可观察到的两次无效

RxSwift)订阅了一个可观察到的两次无法正常工作

修改第一个可观察结果到第二个可观察结果并返回更新的结果

我想将ID添加到div中,与敲除中可观察到的数组的第一个值相同

订阅另一个可观察的订阅者?

rxjava按条件收集对象并等待另一个可观察到的对象

RxJava:等待另一个可观察到的结果

仅从 Json 的第一个结果中获取数据

仅从 SQLITE 中删除第一个结果