Extend RxJS Observable class with operators

Estus Flask

How can Observable class be extended by applying built-in RxJS operators to it?

I would like to do something like this:

class TruthyObservable extends Observable {
  constructor(subscriber) {
    super(subscriber);

    return this.filter(x => x);
  }
}

class TruthyMappedObservable extends TruthyObservable {
  constructor(subscriber) {
    super(subscriber);

    return this.map(x => `'${x}'`);
  }
}

Can this be done without constructor return?

martin

This pretty much depends on what you want to do but let's say you want to make a TruthyObservable that behaves very much like the default Observable.create(...) but passes only even numbers:

import { Observable, Observer, Subscriber, Subject, Subscription } from 'rxjs';
import 'rxjs/add/operator/filter';

class TruthyObservable<T> extends Observable<T> {

    constructor(subscribe?: <R>(this: Observable<T>, subscriber: Subscriber<R>) => any) {
        if (subscribe) {
            let oldSubscribe = subscribe;
            subscribe = (obs: Subscriber<any>) => {
                obs = this.appendOperators(obs);
                return oldSubscribe.call(this, obs);
            };
        }

        super(subscribe);
    }

    private appendOperators(obs: Subscriber<any>) {
        let subject = new Subject();

        subject
            .filter((val: number) => val % 2 == 0)
            .subscribe(obs);

        return new Subscriber(subject);
    }

}

let o = new TruthyObservable<number>((obs: Observer<number>) => {
    obs.next(3);
    obs.next(6);
    obs.next(7);
    obs.next(8);
});

o.subscribe(val => console.log(val));

This prints to console:

6
8

See live demo: https://jsbin.com/recuto/3/edit?js,console

Usually classes inheriting Observable override the _subscribe() method that actually makes the subscription internally but in ours case we want to use the callback where we can emit values by ourselves (since this Observable doesn't emit anything itself). Method _subscribe() is overshadowed by _subscribe property if it exists so we wouldn't be able to append any operators to it if we just overrode this method. That's why I wrap _subscribe in the constructor with another function and then pass all values through a Subject chained with filter() in appendOperators() method. Note that I replaced the original Observer with the Subject at obs = this.appendOperators(obs).

At the end when I call eg. obs.next(3); I'm in fact pushing values to the Subject that filters them and passes them to the original Observer.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related