Java流在完成上一个之前开始下一个映射

德米特里

我有2个执行者:

ExecutorService executorDownload = Executors.newFixedThreadPool(n);
ExecutorService executorCalculate = Executors.newFixedThreadPool(m);

首先,我需要将任务放入executorDownload中,然后在它们强制执行之后将它们放入executorCalculate中,然后得到结果。我写了下一个流:

long count = IntStream.range(0, TASK_NUMBER)
            .boxed()
            .parallel()
            .map(i -> executorDownload.submit(new Download(i)))
            .map(future -> calculateResultFuture(executorCalculate, future))
            .filter(Objects::nonNull)
            .filter(Main::isFutureCalculated)
            .count();

public static Future<CalculateResult> calculateResultFuture(ExecutorService executorCalculate, Future<DownloadResult> future) {
    try {
        return executorCalculate.submit(new Calculate(future.get()));
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    return null;
}

public static boolean isFutureCalculated(Future<CalculateResult> future) {
    try {
        return future.get().found;
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    return false;
}

是否有可能开始

.map(future -> calculateResultFuture(executorCalculate, future))

.map(i -> executorDownload.submit(new Download(i)))

结束了。我需要在第一张地图开始后第二张地图开始。

ernest_k

您只需要了解流将处理元素一个接一个地流,这意味着一个元素在下一个进入第一个中间步骤之前就遍历整个管道(而不是所有元素一起通过每个步骤,当然对于需要所有步骤的中间步骤除外)元素才能完成工作)。

在您的情况下,这意味着第一个元素.get()将被阻止,从而阻止第二个元素进入第一个任务。

要强制所有元素通过第一个提交(也将适用于第二个提交),您需要在开始阻止之前强制流提交所有任务,例如:

List<Future<DownloadResult>> downloadTasks = IntStream.range(0, TASK_NUMBER)
        .mapToObj(i -> executorDownload.submit(new Download(i)))
        .collect(Collectors.toList());
        //removed .parallel()

这将强制启动所有异步任务,然后您可以对第二个异步批处理执行相同的操作:

List<Future<CalculateResult>> calculateResults = downloadTasks.stream()
        .map(future -> calculateResultFuture(executorCalculate, future))
        .filter(Objects::nonNull)
        .collect(Collectors.toList());

那也将迫使所有任务都提交给第二个执行者。从这里,您可以.get()不必进行不必要的等待:

long count = calculateResults.stream()
        .filter(Main::isFutureCalculated)
        .count();

现在,尽管这将消除批处理中元素之间不必要的等待,但仍可能在批处理之间存在不必要的等待(如果第一个元素完成了第一个任务,它将等待所有其余的操作来完成第一个批处理,然后再继续第二个批处理)。为了解决这个问题,您可能需要其他实现。这是为此而设计的一系列可完成的期货:

List<Completable<CalculateResult>> calculateResult = IntStream.range(0, TASK_NUMBER)
     .mapToObj(i -> CompletableFuture.supplyAsync(() -> callDownload(i), executorDownload)
             .thenApplyAsync(downloadResult -> calculateResultFuture(downloadResult), executorCalculate))
     .collect(Collectors.toList());

long count = calculateResult.stream().map(f -> isFutureCalculated(f)).count();

thenApplyAsync 当第一个任务完成时,将使第二个任务接管。

当然,这将需要您稍微更改API,以便直接调用下载方法。我曾经使用callDownload(i)来运行相同的逻辑new Download(i).call()calculateResultFuture()也将更改以删除该Future参数。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

在下一个开始之前等待上一个承诺的Javascript Map?

量角器 - 等待上一个它块在下一个之前完成

JavaScript Chaining Promises:在上一个完成之前调用下一个Promise

作业在上一个作业完成之前开始

cron.daily是否会在开始下一个作业之前等待作业完成?

在 nodejs 中开始下一个循环之前,等待每个 for 循环迭代完成

Kotlin-确保在开始下一个操作之前完成收集操作

如何在下一个功能开始之前等待功能完成?

在开始下一个动作之前等待动作完成

为什么在ngUnsubscribe的“完成”之前“下一个”?

检查下一个周期是否从上一个周期的末尾开始

下一个数组元素必须从上一个开始

上一个/下一个点击

分页下一个上一个

显示下一个之前无法隐藏上一个窗口

如何在执行下一个史诗之前确保完成一个史诗?

Java LinkedList上一个下一个

Java:查找可导航集的上一个和下一个值

TextMate 2退出键:如何禁用下一个完成和上一个完成

systemd在开始下一个服务单元之前不等待我的服务单元完成

Jetpack Compose LazyRow 滚动,仅捕捉到下一个或上一个元素的开始

Python - 在上一个线程完成后运行下一个线程

如何删除Eureka表单中的<>(下一个/上一个)完成工具栏?

使用scrollTop滚动到下一个和上一个div,我该如何完成?

生成 ajax 调用的循环数组,我需要在下一个调用开始之前完成第一个调用

Vim-命令行-上一个和下一个命令键映射

安卓 | 如何在显示下一个对话之前关闭上一个对话?

在实体框架实现上一个查询之前,不会执行下一个查询

在显示下一个UIViewController之前如何获取UITableView的上一个滚动位置?