How can I customize my own Observable?


I'm using retrofit2, rxjava2 and adapter-rxjava to implement my http api call.

//API definition
Observable<String> queryProducts(@Body Query query);

//API implementation.
                .subscribeOn(new Scheduler().ioThread())
                .observeOn(new Scheduler().mainThread())
                .subscribe(new Observer());

If I have a lot of apis need to be implemented, and every individual api implementation needs to add these two lines:

.subscribeOn(new Scheduler().ioThread())
.observeOn(new Scheduler().mainThread())

I don't want to add them in every api implementation. I'd like to use MyObservable as to be the result type of my api definition.

My idea looks like below:

//API definition
 MyObservable<String> queryProducts(@Body Query query);

//MyObservable definition
    public class MyObservable<T> extends Observable<T> {
         * Creates an Observable with a Function to execute when it is subscribed to.
         * <p>
         * <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor,
         * unless you specifically have a need for inheritance.
         * @param f {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
        protected MyObservable(OnSubscribe<T> f) {
            this.subscribeOn(new Scheduler().ioThread());
            this.observeOn(new Scheduler().mainThread());

When I run it, I got below exception:

java.lang.IllegalArgumentException: Unable to create call adapter for MyObservable.

I traced code at I found RxJavaCallAdapterFactory at line 100, it seems it only lets Observable class pass this checkpoint. I couldn't extend and override this method because this class is a final class.

if (rawType != Observable.class && !isSingle && !isCompletable) {
      return null;

Is there any way to add these two line in a super class, I don't want to add them in every api implementation? Thanks so much.


While in RxJava2 you can safely extend Observable, it less likely suits for this kind of situation where you want to reuse common code instead of duplicating it (but for creating Observable from scratch, usually for wrapping external async callback code).
Instead you can use compose() operator which transform Observable with your custom code, and it's classic for adding common logic to Observable.
You can follow Dan Lew's article for a example exactly for your need (adding Schedulers).

Regrading retrofit adapter, as it's creating services with reflection it can't support custom classes but generates the existing RxJava classes.
BTW, you're looking/using retrofit's RxJava1 adapter with RxJava2, you need to switch to RxJava2 adapter, with RxJava2 adapter you can see that actually retrofit uses its own custom Observable classes.

If using compose() isn't enough for you (as you still need to add it to every API), the official way it to create your own CallAdapter.Factory and implement retrofit CallAdapter by wrapping RxJava2CallAdapterFactory delegating adapt to it, and then wrap the return Observable with your custom code/operators/schdeulers. see this tutorial. or example with RxJava2 (pretty the same) at some library I'm working on.

