仅在需要时在反应堆通量中请求下一个

分iller

我有一个API,该API返回的实体列表最多为100个实体。如果有更多实体,它将返回下一页的令牌。

我想创建一个返回所有实体(所有页面)但仅在需要时(如果需要)的流量。

我写了这段代码:

class Page {
    String token;
    List<Object> entities;
}

Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
    return fct.apply(token).flatMapMany(page -> {
        if (page.token == null) {
            // no more pages
            return Flux.fromIterable(page.entities);
        }

        return Flux.fromIterable(page.entities).concatWith(Flux.defer(() -> load(page.token, fct)));
    });
}

它有效-几乎

如果我请求99个元素,则会加载第一页,并且我的磁通包含99个元素。

如果我请求150个元素,则会加载第一页和第二页,并且我的流量包含150个元素。

但是,如果我请求100个元素,则会加载第一页和第二页(并且我的通量包含100个元素)。我的问题是,第二个页面已加载到我未请求第101个元素的位置。

当前行为:

subscribe()
=> Function is called to load page 1
request(10)
=> Received: 0-9
request(89)
=> Received: 10-98
request(1)
=> Received: 99
=> Function is called to load page 2
request(1)
=> Received: 100

预期为:第2页的加载发生在最后一个请求之后(1)

几乎就像在某些地方预取了一样,但是我看不到哪里。有任何想法吗?

分iller

好的,我找到了。每次都没有预取。实际上,Flux.defer是在订阅而非请求时加载下一页。

快速(较脏)的测试可修复以下问题:

Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
    return fct.apply(token).flatMapMany(page -> {
        if (page.token == null) {
            // no more pages
            return Flux.fromIterable(page.entities);
        }

        return Flux
                .fromIterable(page.entities)
                .concatWith(
                        // Flux.defer(() -> load(page.token, fct))
                        Flux.create(s -> {
                            DelegateSubscriber[] ref = new DelegateSubscriber[1];

                            s.onRequest(l -> {
                                if (ref[0] == null) {
                                    ref[0] = new DelegateSubscriber(s);
                                    load(page.token, fct).subscribe(ref[0]);
                                }
                                ref[0].request(l);
                            });
                        }));
    });
}

static class DelegateSubscriber extends BaseSubscriber<Object> {

    FluxSink<Object> delegate;

    public DelegateSubscriber(final FluxSink<Object> delegate) {
        this.delegate = delegate;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        // nothing
    }

    @Override
    protected void hookOnNext(Object value) {
        delegate.next(value);
    }

    @Override
    protected void hookOnError(Throwable throwable) {
        delegate.error(throwable);
    }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何总结一个反应堆通量流值是多少?

工程反应堆:设计一个反应API

反应堆通量的条件逻辑

工程反应堆:助焊剂超时仅在第一个项目没有发出

下一个/反应中的“条件”CSS

仅当第一个发出的事件不符合条件时,才从通量中发布下一个元素

春季启动2反应堆通量API的客户角

通量春5反应堆没有订阅

反应堆 - 如何在热通量重试不失元素?

如何计算项目反应堆通量的平均值?

来自gRPC StreamObserver的“桥接”反应堆通量

反应堆通量抛出llegalArgumentException-由于bufferTimeout而怀疑

仅在单击某些内容时转到下一个迭代

cin 仅在输入非数字值时跳过下一个 cin

RecyclerView 仅在编辑要添加的下一个项目时显示项目

Selenium 单击下一个按钮 - 尝试单击按钮时没有任何反应

查找下一个仅在第一次迭代中起作用

for 中的下一个元素

反应分页下一个按钮

反应更改状态下一个组件

定期调用可观察对象,仅在下一个成功时切换到下一个

将 selenium 请求存储在 jmeter 中以在下一个请求中使用

如何合并两个并行执行两个反应堆通量的返回列表并合并结果

我如何在反应中获取并将结果传递给下一个获取?

单击下一个按钮时,水平幻灯片页面需要置于顶部

同一反应堆中的依赖项

单击下一个按钮时显示下一个问题

反应表中的自定义“上一个”和“下一个”文本

用于下一个请求的响应cookie