I would like to create an RxJS Observable from an iterable like the following:
const networkIterableFactory = (resource: string) => {
let i = 0;
return {
[Symbol.iterator]() {
return {
next() {
return {
done: false,
value: fetch(resource, {
mode: 'cors',
}).then(async response => {
console.log('i = ', i);
await throttle(10000); // Do some stuff
i++;
return {i: 'i'};
}),
};
},
};
},
};
};
function throttle(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}
let networkIterable = networkIterableFactory('google.com');
let network$ = rxjs.from(networkIterable).pipe(rxjs.operators.take(5));
network$.subscribe(() => console.log('yo!'));
Issue is that i prints 5 times as 0. It seems as though the way that the iterable's iterator saves its state is through updating the outer closure. rxjs.from just takes the whole iterable as one emmission so a bunch of unresolved promises are returned, but I need the iterator state to be altered by logic within the promise callback. Is there a way to make the observable wait until the promise resolves before emitting the next item from the iterator? I would rather avoid using asyncIterable because I don't want to bring in IxRx.
Since the values of your iterable are returned asynchronously, then you should implement Symbol.asyncIterator
instead of Symbol.iterator
Try this instead:
const networkIterableFactory = (resource: string) => {
let i = 0;
return {
[Symbol.asyncIterator]() {
return {
next() {
return fetch(resource, { mode: 'cors' }).then(x => ({ done: false, value: x }));
},
};
},
};
};
function throttle(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}
let networkIterable = networkIterableFactory('google.com');
let network$ = rxjs.from(networkIterable).pipe(rxjs.operators.take(5));
network$.subscribe(() => console.log('yo!'));
Edit:
RxJS actually doesn't support Async iterators yet: https://github.com/ReactiveX/rxjs/issues/1624.
I also tried with an AsyncGenerator
:
const { from } = require('rxjs');
async function* test() {};
const asyncGenerator = test();
from(asyncGenerator);
But it throws:
TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
So you won't be able to make it with this pattern, I actually believe that RxJS is not suited for "pulling" data like you do with take
(If this pattern worked, it would end up in an infinite requests loop, even if you only take
5 results). It is rather designed to "push" things to whoever listens.
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments