Call asynchronous operations in RxJS operators

ovg

With the code below I get a value every 500ms and I take 10 of them.

Now I want to filter out invalid values but the function I want to call is asynchronous (but happens very quickly) and I don't know how to do that in the filter operator.

this.subscription = this.input.getStream()
    .throttleTime(500)
    .filter(value => {
      // I want to filter out invalid values
      // but the function I want to call here is async
    })
    .take(10)  // and get 10 valid ones
    .subscribe(value => {
      this.values.push(value)
    })

Note: I want the filter to happen after the time throttling.

jonrsharpe

One way to do this is to:

  1. use flatMap to call the async function, returning both the value and the result to be used for filtering;
  2. filter based on the latter; then
  3. at some later point, unwrap the value again.

In your case, maybe something like:

this.subscription = this.input.getStream()
    .throttleTime(500)
    .flatMap(value => asyncFunction(value).map(include => ({ value, include }))
    .filter(({ include }) => include)
    .take(10)
    .subscribe(({ value }) => {
      this.values.push(value)
    })

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related