Observable 只发出第一个值

暗影温柔

我正在尝试创建一个可从 firebase 查询返回列表的 observable。问题是当我调用 onNext 发出 Item 然后 onComplete 它停止发出第一个项目之后的项目,并且根本不调用 onComplete 什么也不发出。有没有正确的方法来做我想要实现的目标?我对RxJava静止陌生,所以请原谅我的无知。提前感谢您的任何帮助:)

public Observable<Message> getMessageObservable(String uid) {
    currentUser = auth.getCurrentUser();
    DatabaseReference db_messages = db_root.child("Messages").child(currentUser.getUid())
            .child(uid);
    Query messageQuery = db_messages.orderByKey().limitToLast(10);
    return Observable.create(emitter -> {
        messageQuery.addChildEventListener(new ChildEventListener() {
            @Override
            public void onChildAdded(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
                String messageText = dataSnapshot.child("message").getValue().toString();
                String messageId = dataSnapshot.child("MessageId").getValue().toString();
                Boolean seen = dataSnapshot.child("seen").getValue(Boolean.class);
                Long timestamp = dataSnapshot.child("timestamp").getValue(long.class);
                String fromUser = dataSnapshot.child("from").getValue().toString();
                String toUser = dataSnapshot.child("to").getValue().toString();
                Message message = new Message(messageText, toUser, messageId, seen, timestamp, null, fromUser);
                emitter.onNext(message);
                emitter.onComplete();
            }

            @Override
            public void onChildChanged(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {

            }

            @Override
            public void onChildRemoved(@NonNull DataSnapshot dataSnapshot) {

            }

            @Override
            public void onChildMoved(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {

            }

            @Override
            public void onCancelled(@NonNull DatabaseError databaseError) {

            }
        });
    });
}

@Override
public void getMessages(String userId) {
    currentUser = auth.getCurrentUser();
    Observable.just(userId)
            .flatMap(this::getMessageObservable)
            .toList()
            .subscribe(messages -> {
                chatResults.getMessagesResult(messages);
            });
}
错误

解决问题的方法总是有很多。请检查这是否适合您:

  1. 更改getMessageObservable为获取引用、查询和添加ChildEventListener侦听器的简单方法(未创建 observables 等)
  2. 创建PublishSubject<String> myMessages = PublishSubject.create()pub 主题,像通常使用 observable 一样订阅它。在您的订阅中确保收听 onNext 动作 (Action1)
  3. 在您的ChilddEventListener实现中,确保在myMessages.onNext(message)新消息到达时调用

通过上述设置,您现在将有消息进入您的 onNext 订阅。您可以保留可变列表并附加(或预先添加)即将到来的消息,从而通知感兴趣的各方重新更新消息列表。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Observable 订阅只接收第一个事件

来自 Subject 的 Observable 发出第一个值为 null

RxJS:让 Observables 只在前一个 Observable 完成后才发出值

Angular 5-Observable-将Observable的第一个值设置为表单组

Observable.zip但获得第一个值,不要等待所有值

如何使用 RxJS 跳过 observable 的第一个值?

运行第二个 Observable 是第一个 Observable 是假的

Observable 如何根据另一个 Observable 发出值

RxJS Observable-如果在前x分钟内未发出第一个事件,则返回,否则继续x + y分钟

如何仅发出 Observable 的一个或最后一个值?

RxJS-将Observable分为两个,等待第一个完成

从android中的第一个observable接收结果后如何启动第二个observable?

如果第一个observable为null,则angular rxjs仅订阅第二个Observable

Javascript rxjs - 当最后一个值从 observable 发出时做一些事情

如何创建一个 observable,它从 ngrx 选择器发出一个值,然后在延迟后发出另一个值?

当第一个Observable为空时的RxJava concat

RxJava-如何获取列表的第一个元素并将其返回为Observable

我如何使用 Observable 中的第一个也是最少的元素?

如何将 Observable<T> 的第一个元素添加到每个组?

Angular 2 Typescript Observable返回第一个对象结果

Rxjs在第一个匹配项上执行并退出Observable

RxJS顺序请求并返回Observable <>第一个响应

如何在第一个 observable 上组合多个 observables 并在方法中作为新的 observable 返回

如何在使用RxJS并行执行其他Observable之前等待第一个Observable完成

一个 Rx observable 将作为 ReplaySubject 但仅适用于第一个订阅者?

RxSwift - 如何在 observable 更改但仅发出最后一个值时重试?

创建一个定期发出值的 observable(错误:observer.timer 不是函数)

将Observable与第二个Observable结合起来,第二个Observable使用第一个的结果

当源Observable发出但仅使用最新值时如何切换到另一个Observable