我有一个API,该API返回的实体列表最多为100个实体。如果有更多实体,它将返回下一页的令牌。
我想创建一个返回所有实体(所有页面)但仅在需要时(如果需要)的流量。
我写了这段代码:
class Page {
String token;
List<Object> entities;
}
Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
return fct.apply(token).flatMapMany(page -> {
if (page.token == null) {
// no more pages
return Flux.fromIterable(page.entities);
}
return Flux.fromIterable(page.entities).concatWith(Flux.defer(() -> load(page.token, fct)));
});
}
它有效-几乎
如果我请求99个元素,则会加载第一页,并且我的磁通包含99个元素。
如果我请求150个元素,则会加载第一页和第二页,并且我的流量包含150个元素。
但是,如果我请求100个元素,则会加载第一页和第二页(并且我的通量包含100个元素)。我的问题是,第二个页面已加载到我未请求第101个元素的位置。
当前行为:
subscribe()
=> Function is called to load page 1
request(10)
=> Received: 0-9
request(89)
=> Received: 10-98
request(1)
=> Received: 99
=> Function is called to load page 2
request(1)
=> Received: 100
预期为:第2页的加载发生在最后一个请求之后(1)
几乎就像在某些地方预取了一样,但是我看不到哪里。有任何想法吗?
好的,我找到了。每次都没有预取。实际上,Flux.defer
是在订阅而非请求时加载下一页。
快速(较脏)的测试可修复以下问题:
Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
return fct.apply(token).flatMapMany(page -> {
if (page.token == null) {
// no more pages
return Flux.fromIterable(page.entities);
}
return Flux
.fromIterable(page.entities)
.concatWith(
// Flux.defer(() -> load(page.token, fct))
Flux.create(s -> {
DelegateSubscriber[] ref = new DelegateSubscriber[1];
s.onRequest(l -> {
if (ref[0] == null) {
ref[0] = new DelegateSubscriber(s);
load(page.token, fct).subscribe(ref[0]);
}
ref[0].request(l);
});
}));
});
}
static class DelegateSubscriber extends BaseSubscriber<Object> {
FluxSink<Object> delegate;
public DelegateSubscriber(final FluxSink<Object> delegate) {
this.delegate = delegate;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
// nothing
}
@Override
protected void hookOnNext(Object value) {
delegate.next(value);
}
@Override
protected void hookOnError(Throwable throwable) {
delegate.error(throwable);
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句