现在玩RxJava,偶然发现以下问题:
我有2个不同的流:
所以本质上我有项目流,并且我希望所有这些项目都与第二个流中的单个项目合并:
---- A1 ---- A2 ---- A3 ---- A4 ---- A5 ---- | --------------->
------------- b1-- | ------------------------------------------------- ->
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
------------ a1b1-a2b1-a3b1-a4b1-a5b1 -------->
它看起来确实类似于combileLatest
operator,但是combinateLatest将忽略第一个流中的所有项目,除了最接近第二个流中的项目。这意味着我不会收到a1b1
-发出的第一个结果项目将是a2b1
。
我又看了看delay
运营商,但它不允许我指定close
像它与做流buffer
算子
有没有花哨的运算符可以解决上述问题?
AFAIK,没有内置的运算符可实现您描述的行为。您始终可以实现自定义运算符,也可以在现有运算符之上构建它。我认为第二个选项更容易实现,这是代码:
public static <L, R, T> Observable<T> zipper(final Observable<? extends L> left, final Observable<? extends R> right, final Func2<? super L, ? super R, ? extends T> function) {
return Observable.defer(new Func0<Observable<T>>() {
@Override
public Observable<T> call() {
final SerialSubscription subscription = new SerialSubscription();
final ConnectableObservable<? extends R> cached = right.replay();
return left.flatMap(new Func1<L, Observable<T>>() {
@Override
public Observable<T> call(final L valueLeft) {
return cached.map(new Func1<R, T>() {
@Override
public T call(final R valueRight) {
return function.call(valueLeft, valueRight);
}
});
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
subscription.set(cached.connect());
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
subscription.unsubscribe();
}
});
}
});
}
如果您对代码有任何疑问,我可以详细解释。
更新
关于询问我的解决方案与以下解决方案有何不同的问题:
left.flatMap(valueLeft -> right.map(valueRight -> together(valueLeft, valueRight)));
left
,right
可观察变量和可观察变量都是并行执行的。right
可观察的不必等待left
一个发出其第一项。缓存-我的解决方案仅订阅一次right
可观察对象并缓存其结果。那就是为什么b1
所有aXXX
项目总是一样的原因。akarnokd提供的解决方案在right
每次left
发射一个项目时都会订阅可观察的项目。这意味着:
无法保证b1
不会改变其价值。例如,对于以下可观察到的对象,您将为b
每个对象得到不同的结果a
。
final Observable<Double> right = Observable.defer(new Func0<Observable<Double>>() {
@Override
public Observable<Double> call() {
return Observable.just(Math.random());
}
});
如果right
可观察对象是一项耗时的操作(例如,网络呼叫),则每次left
可观察对象发出新项目时,您都必须等待其完成。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句