RxJs: How to only maintain the latest value until inner observable complete

CRIMX

I'm new to RxJs and having trouble to achieve this in "RxJs way":

An infinite stream a$ emits a value a once a while.

async() takes the a and performs an async operation.

If a$ emits values while async is pending, only keep the latest one al.

After the previous async completes, if there is an al, run async(al).

And so on.

a$:----a1----------a2----a3-----------------------a4-----------
       async(a1):------------end                  async(a4):---
                             async(a3):-----end

Here is what I came up with, a bit nasty:

var asyncIdle$ = new Rx.BehaviorSubject()
var asyncRunning$ = new Rx.Subject()
var async$ = asyncIdle$

function async (val) {
  async$ = asyncRunning$
  // do something with val
  console.log(val + ' handling')
  setTimeout(() => {
    console.log(val + ' complete')
    async$.next()
    async$ = asyncIdle$
  }, 2000)
}

// simulate a$
var a$ = Rx.Observable.fromEvent(document, 'click')
.mapTo(1)
.scan((acc, curr) => acc + curr)
.do(val => console.log('got ' + val))


a$.debounce(() => async$)
.subscribe(val => {
  async(val)
})
cartant

You can use the audit operator to solve the problem, like this (the comments should explain how it works):

// Simulate the source.

const source = Rx.Observable.merge(
  Rx.Observable.of(1).delay(0),
  Rx.Observable.of(2).delay(10),
  Rx.Observable.of(3).delay(20),
  Rx.Observable.of(4).delay(150),
  Rx.Observable.of(5).delay(300)
).do(value => console.log("source", value));

// Simulate the async task.

function asyncTask(value) {
  return Rx.Observable
    .of(value)
    .do(value => console.log(" before async", value))
    .delay(100)
    .do(value => console.log(" after async", value));
}

// Compose an observable that's based on the source.
// Use audit to ensure a value is not emitted until
// the async task has been performed.
// Use share so that the signal does not effect a
// second subscription to the source.

let signal;

const audited = source
  .audit(() => signal)
  .mergeMap(value => asyncTask(value))
  .share();

// Compose a signal from the audited observable to
// which the async task is applied.
// Use startWith so that the first emitted value
// passes the audit.

signal = audited
  .mapTo(true)
  .startWith(true);

audited.subscribe(value => console.log("output", value));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

RxJS - subscribe only once but do not complete Observable

RxJS: How to subscribe to inner observable only when outer observable/subject is subscribed to?

RxJS hold observable and next latest value on untold

How can I complete Observable in RxJS

How to chain rxjs observables to get value of inner observable and return with parent in Angular

How to switch to another Observable when a source Observable emits but only use latest value

Observable: Getting latest value in intervals until source finishes

Testing RxJS observable that tracks the last emitted value (no complete signal)?

How to retry on error only inner observable?

RxJS operator that doesn't emit to inner observable again until its completed

How can I make an observable emit only latest value and not past values

How to create an Observable that only fires when it has subscribers, and provides the latest value to new subscribers immediately

RxJs | How to delay `mergeMap` to moment when previous inner observable completes?

rxjs: how to make foreach loop wait for inner observable

How to cancel a pending request in an inner HTTP observable with RXJS?

RxJs Observable not calling complete method

How to wait for an observable to complete to return a value?

RXJS observable unsubscribe inner observables

How to skip first value of an observable with RxJS?

How to get current value of RxJS Subject or Observable?

How to assign an initial value to Subject RxJS observable?

RxJS (Interval) Observable wait for last observable to complete

Knockout observable latest value

RXJS - waiting until both observables are complete

Prevent Observable from Running Again Until Complete

Rxjs observable wait until some condition is met

Buffer until with redux-observable and rxjs

rxjs takeuntil, check until observable source is valid

RxJS Waits until Subscribe Observable Finish

TOP Ranking

HotTag

Archive