RxJs: How to only maintain the latest value until inner observable complete


I'm new to RxJs and having trouble to achieve this in "RxJs way":

An infinite stream a$ emits a value a once a while.

async() takes the a and performs an async operation.

If a$ emits values while async is pending, only keep the latest one al.

After the previous async completes, if there is an al, run async(al).

And so on.

       async(a1):------------end                  async(a4):---

Here is what I came up with, a bit nasty:

var asyncIdle$ = new Rx.BehaviorSubject()
var asyncRunning$ = new Rx.Subject()
var async$ = asyncIdle$

function async (val) {
  async$ = asyncRunning$
  // do something with val
  console.log(val + ' handling')
  setTimeout(() => {
    console.log(val + ' complete')
    async$ = asyncIdle$
  }, 2000)

// simulate a$
var a$ = Rx.Observable.fromEvent(document, 'click')
.scan((acc, curr) => acc + curr)
.do(val => console.log('got ' + val))

a$.debounce(() => async$)
.subscribe(val => {

You can use the audit operator to solve the problem, like this (the comments should explain how it works):

// Simulate the source.

const source = Rx.Observable.merge(
).do(value => console.log("source", value));

// Simulate the async task.

function asyncTask(value) {
  return Rx.Observable
    .do(value => console.log(" before async", value))
    .do(value => console.log(" after async", value));

// Compose an observable that's based on the source.
// Use audit to ensure a value is not emitted until
// the async task has been performed.
// Use share so that the signal does not effect a
// second subscription to the source.

let signal;

const audited = source
  .audit(() => signal)
  .mergeMap(value => asyncTask(value))

// Compose a signal from the audited observable to
// which the async task is applied.
// Use startWith so that the first emitted value
// passes the audit.

signal = audited

audited.subscribe(value => console.log("output", value));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

