我试图了解rxjava合并是如何工作的。所以这是简单的代码,应该合并2个可观察对象的结果并发送给订阅者
Observable.merge(getObservable(), getTimedObservable())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override public void call(final String s) {
Log.i("test", s);
}
});
private Observable<String> getTimedObservable() {
return Observable.interval(150, TimeUnit.MILLISECONDS)
.map(new Func1<Long, String>() {
@Override public String call(final Long aLong) {
Log.i("test", "tick thread: " + Thread.currentThread().getId());
return String.valueOf(aLong);
}
});
}
public Observable<String> getObservable() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(final Subscriber<? super String> subscriber) {
try {
Log.i("test", "simple observable thread: " + Thread.currentThread().getId());
for (int i = 1; i <= 10; i++) {
subscriber.onNext(String.valueOf(i * 100));
Thread.sleep(300);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}
我预计订户中的合并结果将是
100 0 1 200 2 300 4 5 400
或类似的东西,但是,实际结果是:
test: simple observable thread: 257
test: 100
test: 200
test: 300
test: 400
test: 500
test: 600
test: 700
test: 800
test: 900
test: 1000
test: tick thread: 254
test: 0
test: tick thread: 254
test: 1
test: tick thread: 254
test: 2
test: tick thread: 254
test: 3
test: tick thread: 254
test: 4
test: tick thread: 254
test: 5
test: tick thread: 254
test: 6
test: tick thread: 254
test: 7
test: tick thread: 254
test: 8
test: tick thread: 254
test: 9
test: tick thread: 254
test: 10
test: tick thread: 254
test: 11
test: tick thread: 254
test: 12
test: tick thread: 254
test: 13
在第一个可观察的块中看起来像Thread.sleep在第二个可观察的块中发出,但是我不知道如何。有人可以解释吗?
合并将同时订阅两个可观察对象。将首先订阅的可观察对象将在调用线程上产生值。因为调用线程被observable1阻塞,所以observable2无法产生值。SubscribeOn将仅说明订阅将在何处发生。假设可观察的开始在main-1上产生值。每个值下游都将在同一线程上。没有并发发生。
如果要实现并发,则必须为每个可观察对象说出必须进行订阅的位置。所以可以说我们有两个可观察对象的Observables.merge。Observable1和Observable2具有某些线程池的subscriptionOn。每个可观察对象将在subscribeOn的给定线程上生成值。您已实现并发。
请在以下位置查看编辑后的输出:
@Test
public void name() throws Exception {
Subscription subscribe = Observable.merge(getObservable(), getTimedObservable())
//.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
System.out.println("subscription " + s);
//Log.i("test", s);
});
Thread.sleep(5_000);
}
private Observable<String> getTimedObservable() {
return Observable.interval(150, TimeUnit.MILLISECONDS)
.map(aLong -> {
System.out.println("getTimedObservable: " + Thread.currentThread().getId());
//Log.i("test", "tick thread: " + Thread.currentThread().getId());
return String.valueOf(aLong);
}).subscribeOn(Schedulers.io());
}
private Observable<String> getObservable() {
return Observable.<String>create(subscriber -> {
try {
for (int i = 1; i <= 10; i++) {
System.out.println("getObservable: " + Thread.currentThread().getId());
subscriber.onNext(String.valueOf(i * 100));
Thread.sleep(300);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}).subscribeOn(Schedulers.io());
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句