我有以下代码:
ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);
ExecutorService poolB = newFixedThreadPool(10, threadFactory("Sched-B-%d"));
Scheduler schedulerB = Schedulers.from(poolB);
ExecutorService poolC = newFixedThreadPool(10, threadFactory("Sched-C-%d"));
Scheduler schedulerC = Schedulers.from(poolC);
private ThreadFactory threadFactory(String pattern) {
return new ThreadFactoryBuilder()
.setNameFormat(pattern).build();
}
@Test
public void testSubscribedOnObservedOn() {
log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
.doOnNext(x -> log("Found 1: " + x))
.observeOn(schedulerB)
.doOnNext(x -> {Thread.sleep(100);log("Found 2: " + x);})
.observeOn(schedulerC)
.doOnNext(x -> log("Found 3: " + x))
.subscribeOn(schedulerA)
.subscribe(
x -> log("Got 1: " + x),
Throwable::printStackTrace,
() -> log("Completed")
);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("Exiting");
}
最后2个运算符将在schedulerC上运行。我希望只有一个线程用于此。但是输出表明2。
0 | main | Starting
72 | main | Created
135 | Sched-A-0 | Subscribed
136 | Sched-A-0 | Found 1: A
138 | Sched-A-0 | Found 1: B
239 | Sched-B-0 | Found 2: A
239 | Sched-C-0 | Found 3: A
240 | Sched-C-0 | Got 1: A
341 | Sched-B-0 | Found 2: B
341 | Sched-C-1 | Found 3: B
341 | Sched-C-1 | Got 1: B
341 | Sched-C-1 | Completed
3129 | main | Exiting
使用Sched-C-0,Sched-C-1。这种行为正确吗?
使用Executors.newFixedThreadPool()
,您可以获得可以对任务提交做出反应的线程池。在当前运行的线程执行此操作之前,第二个线程可能会更快地唤醒以服务更多的工作。无法强制池从其集合中重用同一线程。
相反,RxJava的标准Scheduler
使用单线程工作程序,因此相同的基础线程将为中的操作提供服务observeOn
。
当包装任意对象时Executor
,RxJava最好的办法是确保提交给Scheduler.Worker
它的任务不会重叠。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句