我想並行運行通量,但按順序收集結果。假設我在並行執行一些任務後有 [3,2,1] 的通量我希望結果仍然是 [3,2,1]
val mono = Flux.fromIterable(3 downTo 1)
.map { it.toString() }
// this will return the same number
.flatMap { number -> task(number) }
.doOnNext { println("Number of $it") }
// I got 1, 3, 2 which is good because I want it to run in parallel
.collectList()
.doOnNext { println(it) }
// I still got [1,3,2] but I want it to be [3,2,1] according to the iterable order.
.block()
我認為flatMapSequential
運營商是你正在尋找的。
Flux.fromIterable(3 downTo 1)
.map { it.toString() }
.flatMapSequential { number -> task(number) }
.doOnNext { println("Number of $it") }
.collectList()
.doOnNext { println(it) }
在這裡,flatMapSequential
急切地訂閱其內部發布者(與 flatMap 並行),但根據需要按源元素的順序合併它們。
示例輸出:
3
1
2
[3, 2, 1]
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句