项目反应堆超时

字节;

我正在一个项目反应堆研讨会上工作,并且坚持以下任务:

/**
     * TODO 5
     * <p>
     * For each item call received in colors flux call the {@link #simulateRemoteCall} operation.
     * Timeout in case the {@link #simulateRemoteCall} does not return within 400 ms, but retry twice
     * If still no response then provide "default" as a return value
     */

我无法确定的问题是Flux实际上从未抛出TimeOutException!我可以在控制台日志中观察到这一点:

16:05:09.759 [main] INFO Part04HandlingErrors - Received red delaying for 300 
16:05:09.781 [main] INFO Part04HandlingErrors - Received black delaying for 500 
16:05:09.782 [main] INFO Part04HandlingErrors - Received tan delaying for 300 

我尝试按照语句的顺序进行操作,尽管它似乎并没有改变行为。注意:此外,我尝试了timeout()的重载变体,如果没有发出任何元素,该变体接受应返回的默认值。

public Flux<String> timeOutWithRetry(Flux<String> colors) {

        return colors
                .timeout(Duration.ofMillis(400))
                //.timeout(Duration.ofMillis(400), Mono.just("default"))
                .retry(2)
                .flatMap(this::simulateRemoteCall)
                .onErrorReturn(TimeoutException.class, "default");

    }

有人可以弄清楚为什么不发生超时吗?我怀疑该机制不以某种方式“绑定”到flatMap调用的方法。

为了完整性:辅助方法:

public Mono<String> simulateRemoteCall(String input) {
        int delay = input.length() * 100;
        return Mono.just(input)
                .doOnNext(s -> log.info("Received {} delaying for {} ", s, delay))
                .map(i -> "processed " + i)
                .delayElement(Duration.of(delay, ChronoUnit.MILLIS));
    }

更加完整,这是我用来验证功能的测试:

@Test
    public void timeOutWithRetry() {
        Flux<String> colors = Flux.just("red", "black", "tan");

        Flux<String> results = workshop.timeOutWithRetry(colors);

        StepVerifier.create(results).expectNext("processed red", "default", "processed tan").verifyComplete();
    }
多米尼克:

MartinTarjányi的答案是正确的,但您还问为什么在代码中

    return colors
            .timeout(Duration.ofMillis(400))
            //.timeout(Duration.ofMillis(400), Mono.just("default"))
            .retry(2)
            .flatMap(this::simulateRemoteCall)
            .onErrorReturn(TimeoutException.class, "default");

没有超时发生。

原因是,如果colors磁通量的元素可用,则调用.timeout(Duration.ofMillis(400))没有效果,因为如果在给定的400ms持续时间内没有发射任何项目,则timeout仅传播a ,但是在此示例中情况并非如此。TimeoutException

结果,该元素被发射并且retry(2)也没有作用。接下来,您simulateRemoteCall对发射的元素进行调用,这会花费一些时间,但不会返回错误。代码的结果(除了时序差异之外)与您仅在给定的通量上应用映射相同(相同):

public Flux<String> timeOutWithRetry(Flux<String> colors) {
    return colors.map(s -> "processed " + s);
}

如果要查看调用超时,simulateRemoteCall则必须在调用之后添加timeout方法。

除了使用之外,flatMap您还可以使用concatMap不同之处在于是否应保留顺序,即default是否可能乱序出现。

使用concatMap答案如下所示:

public Flux<String> timeOutWithRetry(Flux<String> colors) {
    return colors.concatMap(
            color -> simulateRemoteCall(color)
                        .timeout(Duration.ofMillis(400))
                        .retry(2)
                        .onErrorReturn("default"));
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章