我有2个执行者:
ExecutorService executorDownload = Executors.newFixedThreadPool(n);
ExecutorService executorCalculate = Executors.newFixedThreadPool(m);
首先,我需要将任务放入executorDownload中,然后在它们强制执行之后将它们放入executorCalculate中,然后得到结果。我写了下一个流:
long count = IntStream.range(0, TASK_NUMBER)
.boxed()
.parallel()
.map(i -> executorDownload.submit(new Download(i)))
.map(future -> calculateResultFuture(executorCalculate, future))
.filter(Objects::nonNull)
.filter(Main::isFutureCalculated)
.count();
public static Future<CalculateResult> calculateResultFuture(ExecutorService executorCalculate, Future<DownloadResult> future) {
try {
return executorCalculate.submit(new Calculate(future.get()));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
}
public static boolean isFutureCalculated(Future<CalculateResult> future) {
try {
return future.get().found;
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return false;
}
是否有可能开始
.map(future -> calculateResultFuture(executorCalculate, future))
前
.map(i -> executorDownload.submit(new Download(i)))
结束了。我需要在第一张地图开始后第二张地图开始。
您只需要了解流将处理元素一个接一个地流,这意味着一个元素在下一个进入第一个中间步骤之前就遍历整个管道(而不是所有元素一起通过每个步骤,当然对于需要所有步骤的中间步骤除外)元素才能完成工作)。
在您的情况下,这意味着第一个元素.get()
将被阻止,从而阻止第二个元素进入第一个任务。
要强制所有元素通过第一个提交(也将适用于第二个提交),您需要在开始阻止之前强制流提交所有任务,例如:
List<Future<DownloadResult>> downloadTasks = IntStream.range(0, TASK_NUMBER)
.mapToObj(i -> executorDownload.submit(new Download(i)))
.collect(Collectors.toList());
//removed .parallel()
这将强制启动所有异步任务,然后您可以对第二个异步批处理执行相同的操作:
List<Future<CalculateResult>> calculateResults = downloadTasks.stream()
.map(future -> calculateResultFuture(executorCalculate, future))
.filter(Objects::nonNull)
.collect(Collectors.toList());
那也将迫使所有任务都提交给第二个执行者。从这里,您可以.get()
不必进行不必要的等待:
long count = calculateResults.stream()
.filter(Main::isFutureCalculated)
.count();
现在,尽管这将消除批处理中元素之间不必要的等待,但仍可能在批处理之间存在不必要的等待(如果第一个元素完成了第一个任务,它将等待所有其余的操作来完成第一个批处理,然后再继续第二个批处理)。为了解决这个问题,您可能需要其他实现。这是为此而设计的一系列可完成的期货:
List<Completable<CalculateResult>> calculateResult = IntStream.range(0, TASK_NUMBER)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> callDownload(i), executorDownload)
.thenApplyAsync(downloadResult -> calculateResultFuture(downloadResult), executorCalculate))
.collect(Collectors.toList());
long count = calculateResult.stream().map(f -> isFutureCalculated(f)).count();
thenApplyAsync
当第一个任务完成时,将使第二个任务接管。
当然,这将需要您稍微更改API,以便直接调用下载方法。我曾经使用callDownload(i)
来运行相同的逻辑new Download(i).call()
。calculateResultFuture()
也将更改以删除该Future
参数。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句