我有这样创建的流:
StreamEx.generate(new MySupplier<List<Entity>>())
.flatMap(List::stream)
.map(Entity::getName)
.map(name -> ...)
.. // more stuff
我只需添加即可将其更改为并行工作parallel
:
StreamEx.generate(new MySupplier<List<Entity>>())
.flatMap(List::stream)
.map(Entity::getName)
.map(name -> ...)
.parallel()
.. // more stuff
但我也想添加一个takeWhile
条件以使流停止:
StreamEx.generate(new MySupplier<List<Entity>>())
.takeWhile(not(List::isEmpty))
.flatMap(List::stream)
.map(Entity::getName)
.map(name -> ...)
.parallel()
.. // more stuff
但是,一旦我添加了takeWhile
流,流似乎就变成了顺序的(至少它仅由一个线程处理)。按照javadoc中的takeWhile
,如果我理解正确,应与并行流工作。我是在做错事还是根据设计?
就像在普通Stream API中一样,如果某些东西可以并行工作,这并不意味着它可以有效工作。Javadoc指出:
尽管此操作对于顺序流而言非常便宜,但在并行管道上却可能非常昂贵。
实际上,您想使用可以专门优化但目前尚未优化的takeWhile
无序流,因此可以将其视为缺陷。我将尝试解决此问题(我是StreamEx作者)。
更新:在0.6.5版本中修复
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句