在RxJava中实现自定义Subject <T,E>

赫克托

我有一个PublishSubject接收Strings并发出Articles的

private PublishSubject<String> articleSubject;

public Observable<Article> newArticleSubject() {
    articleSubject = PublishSubject.create();
    return articleSubject.flatMap(new Func1<String, Observable<Article>>() {
        @Override
        public Observable<Article> call(String articleId) {
            return dataModel.getArticleById(articleId);
        }
    });
}

articleSubject.onNext("1234");

我想创建一个ArticleSubject扩展该类的类Subject<String, Article>我已经试过了:

public class ArticleSubject extends Subject<String, Article> {

    private PublishSubject<String> subject;

    protected ArticleSubject(OnSubscribe<Article> articleOnSubscribe, final IMainDataModel dataModel) {
        super(articleOnSubscribe); //<---- ?????
        this.subject = PublishSubject.create();
        this.subject.flatMap(new Func1<String, Observable<Article>>() {
            @Override
            public Observable<Article> call(String s) {
                return dataModel.getArticleById(s);
            }
        });
    }

    @Override
    public boolean hasObservers() {
        return subject.hasObservers();
    }

    @Override
    public void onCompleted() {
        subject.onCompleted();
    }

    @Override
    public void onError(Throwable e) {
        subject.onError(e);
    }

    @Override
    public void onNext(String s) {
        subject.onNext(s);
    }
}

但是,OnSubscribe在构造函数中该怎么办?我必须保留订户和所有其他东西吗?有没有办法将其委托给PublishSubject

赫克托

我找到了一个通用的解决方案:

public abstract class SubjectBinding<T, E> implements Observer<T> {

    private PublishSubject<T> origin;

    public SubjectBinding() {
        this.origin = PublishSubject.create();
    }

    @Override
    public void onCompleted() {
        origin.onCompleted();
    }

    @Override
    public void onError(Throwable e) {
        origin.onError(e);
    }

    @Override
    public void onNext(T t) {
        origin.onNext(t);
    }

    public Observable<E> asObservable() {
        return origin.flatMap(new Func1<T, Observable<E>>() {
            @Override
            public Observable<E> call(T t) {
                return asObservable(t);
            }
        });
    }

    protected abstract Observable<E> asObservable(T t);

}

 private class ArticleSubject extends SubjectBinding<String, Article> {
        @Override
        protected Observable<Article> asObservable(String s) {
            return dataModel.getArticleById(s);
        }
    }

现在,我可以发送String值了:

articleSubject.onNext("1234");

它发出Articles:

public Observable<Article> newArticleSubject() {
    return new ArticleSubject().asObservable();
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章