Java不使用所有可用的CPU

b心脏:

我有一个长期运行的计算,我需要开展的输入一长串。该计算是独立的,所以我想将它们分发到多个CPU。我使用Java 8。

代码的骨架看起来是这样的:

ExecutorService executorService = Executors.newFixedThreadPool(numThreads);

MyService myService = new MyService(executorService);

List<MyResult> results =
            myInputList.stream()
                     .map(myService::getResultFuture)
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());

executorService.shutdown();

主要功能负责计算如下所示:

CompletableFuture<MyResult> getResultFuture(MyInput input) {
    return CompletableFuture.supplyAsync(() -> longCalc(input), executor)))
}

在长时间运行的计算是无状态的,并没有做任何IO。

我希望这个代码使用所有可用的CPU,但它不会发生。例如,具有72个CPU和一台机器上numThreads=72(或甚至例如numThreads=500),CPU使用率是至多500-1000%,如图HTOP:

HTOP

根据线程转储,许多计算线程都在等待,即:

"pool-1-thread-34" #55 prio=5 os_prio=0 tid=0x00007fe858597890 nid=0xd66 waiting on condition [0x00007fe7f9cdd000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000381815f20> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
    - None

所有的计算线程在等待同一个锁。在转储的时候,只有5计算线程是RUNNABLE,其余的都在等待。

什么都可以对锁的原因,为什么我不能管理使用所有CPU?

霍尔格:

您提交作业,并呼吁join()权之后,等待异步任务的完成。

流的中间步骤执行元件的角度来看,这意味着中间步骤.map(CompletableFuture::join)一个元素上运行在同一时间(更糟糕,因为它是一个连续的数据流),没有确保所有元素都通过提交一步了。这会导致线程块,同时等待各单计算的完成。

你必须开始呼叫之前执行提交的所有作业的join()他们:

List<MyResult> results =
    myInputList.stream()
               .map(myService::getResultFuture)
               .collect(Collectors.toList()).stream()
               .map(CompletableFuture::join)
               .collect(Collectors.toList());

如果你可以表达任何你想用做results列表,要调用一个动作时,一切都完成后,就可以实现在不阻塞线程的方式操作join()

List<CompletableFuture<MyResult>> futures = myInputList.stream()
    .map(myService::getResultFuture)
    .collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new))
    .thenRun(() -> {
        List<MyResult> results = futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
        // perform action with results
    });

它仍然调用join()来检索结果,但在这一点上,所有期货已经完成,因此调用者不会被阻塞。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章