RxSwift - How to retry when an observable changes but emit only last value?

Mateusz Tylman

I'm trying to make chat services chain with RxSwift. Basically it should do:

  • Check if chat has id, if yes go to next, if no flatMap to createChat Service
  • Check if sockect connection is Enabled
  • Send Chat Message

Basically function below works, but it is not checking socket connection status for now. It should check before second flatMap if sockets are connected, if yes it should go to flatMap with messageUpload if not it should wait until connection returns true. I have Variable with is telling me about current connection status(Bool)

chatSocketService.isSubscribedToChannel.asObservable()

but I can't figure out how to put them together. I tried with third flatMap(before current last one) it doesn't work. Next problem is that user can try to send few messages before connection is back so each time he hits sendButton this method is executed, so it should send only last message when connection is back. Any idea how I can handle that with Rx?

func sendMessage(withBody body: String) {
    guard !body.isEmpty else { return }

    Observable.just(chatModel.value)
        .filter({ $0.product != nil })
        .flatMap({ [unowned self] chatModel -> Observable<ChatModel> in
            if chatModel.id != nil {
                return Observable.just(chatModel)
            } else {
                return self.createChat(withProductModel: chatModel.product!)
            }
        })
        .flatMap({ [unowned self] chatModel -> Observable<ChatMessageModel> in
            return self.chatService.uploadChatMessage(forChat: chatModel, withBody: body)
                    .trackActivity(self.progressHelper.activityIndicator)
        })
        .subscribe(onNext: { [unowned self] chatMessageModel in
            self.finishedSendingMessage.onNext(())
        })
        .addDisposableTo(disposeBag)
}
tomahh
.flatMap({ [unowned self] chatModel -> Observable<ChatMessageModel> in
    let successfulConnect = chatSocketService.isSubscribedToChannel.asObservable()
       .skipWhile { $0 == false }
       .map { _ -> Void in }
       .take(1)

    return successfulConnect.flatMap { self.chatService.uploadChatMessage(forChat: chatModel, withBody: body) }
        .trackActivity(self.progressHelper.activityIndicator)
})

The above code should have the behavior described in the question for managing the connection. successfulConnect is an observable that will emit a value and complete when the connection status is true.

skipWhile will ignore false values, map transforms our observable from Observable<Bool> to Observable<Void> and take(1) makes sure the observable completes after the first value. flatMap then performs uploadChatMessage when sucessfulConnect sends a value.


The second behavior you describe implies cancelling the previous upload if user sends another message before the previous one has been uploaded. This can be handled by disposing of the subscription on entering sendMessage

var disposable: Disposable?

func sendMessage(withBody body: String) {
   disposable?.dispose()
   disposable = Observable.just(chatModel.value)
      // ...
      .subscribe(onNext: { /* ... */ })
}

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

How to get last value when subscribing to an Observable?

combineLatest emit only when one of the streams changes

RxSwift throttle how to get only last value from stream

RxJS: Share() an Observable & Emit Last Value to each new subscriber

RxJS Observable: Emit when specific part of the object changes

RxJS function that emit last value from one observable then other emit true

Why does switchMap operator only emit the last value when used with a promise?

How to re-emit the last value of a subject

iOS RxSwift how to understand observable lifecycle when defined inside function

How do I emit the value of an observable to an rxjs Subject?

Update observable when input value changes

Emit new value to Observable

RXJS - Only emit bigger values than the last value

How to get an observable's value without triggering new emit

How can I make an observable emit only latest value and not past values

fromEvent observable emit a value only on hit of enter key

Emit from an observable only if another observable hasn't emitted in the last two seconds

RxSwift replay the last value of a completed observable

How to emit only last request in subscribe method

How should I emit a single value when observable completes?

Update an observable value when other observable value changes in knockout

How to create a stream that emit a value only when another stream emit a value?

Emit Observable only after second Observable is received

How to retry on error only inner observable?

How to emit only one or last value of Observable?

Flutter and Getx: How an observable emit changes

RxSwift refreshToken use retry(when:)

Carry last observation forward (na.locf?) - but only when another column value changes

How to trigger an observable when another observable fires in RxSwift?

TOP Ranking

HotTag

Archive