RxJs Observable created from Iterable of promises not updating values each iteration

Allen More

I would like to create an RxJS Observable from an iterable like the following:

const networkIterableFactory = (resource: string) => {
  let i = 0;
  return {
    [Symbol.iterator]() {
      return {
        next() {
          return {
            done: false,
            value: fetch(resource, {
              mode: 'cors',
            }).then(async response => {
              console.log('i = ', i);
              await throttle(10000); // Do some stuff
              i++;
              return {i: 'i'};
            }),
          };
        },
      };
    },
  };
};

function throttle(ms: number) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

let networkIterable = networkIterableFactory('google.com');

let network$ = rxjs.from(networkIterable).pipe(rxjs.operators.take(5));

network$.subscribe(() => console.log('yo!'));

Issue is that i prints 5 times as 0. It seems as though the way that the iterable's iterator saves its state is through updating the outer closure. rxjs.from just takes the whole iterable as one emmission so a bunch of unresolved promises are returned, but I need the iterator state to be altered by logic within the promise callback. Is there a way to make the observable wait until the promise resolves before emitting the next item from the iterator? I would rather avoid using asyncIterable because I don't want to bring in IxRx.

Guerric P

Since the values of your iterable are returned asynchronously, then you should implement Symbol.asyncIterator instead of Symbol.iterator Try this instead:

const networkIterableFactory = (resource: string) => {
  let i = 0;
  return {
    [Symbol.asyncIterator]() {
      return {
        next() {
          return fetch(resource, { mode: 'cors' }).then(x => ({ done: false, value: x }));
        },
      };
    },
  };
};

function throttle(ms: number) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

let networkIterable = networkIterableFactory('google.com');

let network$ = rxjs.from(networkIterable).pipe(rxjs.operators.take(5));

network$.subscribe(() => console.log('yo!'));

Edit:

RxJS actually doesn't support Async iterators yet: https://github.com/ReactiveX/rxjs/issues/1624.

I also tried with an AsyncGenerator:

const { from } = require('rxjs');

async function* test() {};
const asyncGenerator = test();
from(asyncGenerator);

But it throws:

TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.

So you won't be able to make it with this pattern, I actually believe that RxJS is not suited for "pulling" data like you do with take (If this pattern worked, it would end up in an infinite requests loop, even if you only take 5 results). It is rather designed to "push" things to whoever listens.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

How to make mockito return values from a list for each iteration on a for loop

updating observable in rxjs

Angular2 / RxJS - updating variable after getting data from Http observable

rxjs created observable timeout always errors

RxJS: flatMap an array of values with the results of promises

RxJs Observable duplicate values

Difference in importing Observable from 'rxjs/Observable' and 'rxjs'

Angular rxjs mix observable and promises

RxJs share operator and Observable created with range

RxJS catchError operator does not catch an error on an observable created from a Promise

RxJs Observable in Observable from Array

RXJS: Skipping values from observable triggered by other observable

Parallelizing a loop with updating during each iteration

RXJS: Non sequential behavior of concatenated observables created from promises

Updating array after each iteration

how to make jmeter to select unique values from response for each iteration

Take N values from observable on each interval

Angular: RxJS Observable only updating on timeout

RxJS Observable returning array, run another function with each array iteration

Rxjs bind context of place where observable was created

rxjs join and filter an observable based on values from a different observable

Chaining RXJs promises Observable.from

Updating a list inside a dictionary in each iteration

Angular 12/Typescript/rxjs: error handling of nested Promises and rxjs Observable

Updating value at each iteration using python

How get two or more values as an array from observable rxjs?

Rxjs: map each element in an array field from an observable of object with another observable

RXJS, how to change the result of each result from an observable?

Is there a way to save values of variables from each iteration of for loops

TOP Ranking

  1. 1

    Failed to listen on localhost:8000 (reason: Cannot assign requested address)

  2. 2

    How to import an asset in swift using Bundle.main.path() in a react-native native module

  3. 3

    Loopback Error: connect ECONNREFUSED 127.0.0.1:3306 (MAMP)

  4. 4

    pump.io port in URL

  5. 5

    Spring Boot JPA PostgreSQL Web App - Internal Authentication Error

  6. 6

    BigQuery - concatenate ignoring NULL

  7. 7

    ngClass error (Can't bind ngClass since it isn't a known property of div) in Angular 11.0.3

  8. 8

    Do Idle Snowflake Connections Use Cloud Services Credits?

  9. 9

    maven-jaxb2-plugin cannot generate classes due to two declarations cause a collision in ObjectFactory class

  10. 10

    Compiler error CS0246 (type or namespace not found) on using Ninject in ASP.NET vNext

  11. 11

    Can't pre-populate phone number and message body in SMS link on iPhones when SMS app is not running in the background

  12. 12

    Generate random UUIDv4 with Elm

  13. 13

    Jquery different data trapped from direct mousedown event and simulation via $(this).trigger('mousedown');

  14. 14

    Is it possible to Redo commits removed by GitHub Desktop's Undo on a Mac?

  15. 15

    flutter: dropdown item programmatically unselect problem

  16. 16

    Change dd-mm-yyyy date format of dataframe date column to yyyy-mm-dd

  17. 17

    EXCEL: Find sum of values in one column with criteria from other column

  18. 18

    Pandas - check if dataframe has negative value in any column

  19. 19

    How to use merge windows unallocated space into Ubuntu using GParted?

  20. 20

    Make a B+ Tree concurrent thread safe

  21. 21

    ggplotly no applicable method for 'plotly_build' applied to an object of class "NULL" if statements

HotTag

Archive