当基础数据源具有新值时,了解可观察到的RxJava

安卓

我正在尝试试验RxJava可观察和观察者代码。我的目标是检查基础源接收新数据值时事情如何工作。我的代码是:

List<Integer> numbers = new ArrayList<>();
Runnable r = new Runnable() {
            @Override
            public void run() {
                int i = 100;
                while(i < 110) {
                    numbers.add(i);
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
            }
        };
        numbers.add(0);
        numbers.add(1);
        numbers.add(2);
        Observable.fromIterable(numbers)
                .observeOn(Schedulers.io())
                .subscribe(i -> System.out.println("Received "+i+ " on "+ Thread.currentThread().getName()),
                        e -> e.printStackTrace());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Thread t = new Thread(r);
        t.start();

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

所以我有一个数字清单。然后,我有了一个runnable,它将新数字添加到此列表中,并且在添加之间存在时间间隔。我还没有启动线程。我将0,1,2添加到列表中,然后使用它创建一个可观察对象,在池中的线​​程上安排观察者,最后订阅该可观察对象。发生订阅时,observable发出值0,1,2,并调用观察者(执行传递给订阅的lambda)。然后,我在主线程上引入了1秒的延迟,然后使用之前创建的runnable生成了一个新线程,并添加了最后的延迟,以使应用程序不会立即退出。

我期望的是,随着新数字添加到列表中,必须调用观察者,从而打印消息。但这不会发生。我的理解肯定是错误的。我还需要在调度程序上放置可观察的内容吗?

普罗格曼

Observable.fromIterable()方法是每次建立订阅时都可观察到的“一次性”值加载。构建订阅后的“之后”将不再有影响。当将该subscribe(onNext, onError, onComplete)方法与onComplete参数一起使用时,您将看到预订已完全用尽,并且已打印三个初始值。

您可以使用Subject(类似的PublishSubjectonNext()方法,在方法中添加“新值”,而先前构建的订阅仍处于活动状态(但尚未完成)。这样,您可以先构建订阅,然后继续调用onNext()主题中的新值,直到完成并调用onCompleted()

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章