我有一个List<Flow<T>>
,想生成一个Flow<List<T>>
。这几乎是combine
做什么的-除了Combine等待每个Flow
发出初始值,这不是我想要的。以下面的代码为例:
val a = flow {
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
combine(flows) {
it.toList()
}.collect { println(it) }
}
使用combine
(因此保持原样),这是输出:
[a2, b1, c]
[a2, b2, c]
而我也对所有中介步骤都感兴趣。这是我从这三个流程中想要的:
[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
现在,我有两种解决方法,但是它们都不是很好的解决方案...第一个解决方案很丑陋,不适用于可空类型:
val flows = listOf(a, b, c).map {
flow {
emit(null)
it.collect { emit(it) }
}
}
runBlocking {
combine(flows) {
it.filterNotNull()
}.collect { println(it) }
}
通过强制所有流发出第一个无关的值,combine
确实调用了转换器,并让我删除了我知道不是实际值的空值。对此进行迭代,使可读性更强:
sealed class FlowValueHolder {
object None : FlowValueHolder()
data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
flow {
emit(FlowValueHolder.None)
it.collect { emit(FlowValueHolder.Some(it)) }
}
}
runBlocking {
combine(flows) {
it.filterIsInstance(FlowValueHolder.Some::class.java)
.map { it.value }
}.collect { println(it) }
}
现在,这个程序可以正常工作,但是仍然感觉我在做过多事情。协程库中有我缺少的方法吗?
这个怎么样:
inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
val array= Array(flows.size) {
false to (null as T?) // first element stands for "present"
}
flows.forEachIndexed { index, flow ->
launch {
flow.collect { emittedElement ->
array[index] = true to emittedElement
send(array.filter { it.first }.map { it.second })
}
}
}
}
它解决了一些问题:
[]
不在结果流中因此,您不会注意到任何特定于实现的变通方法,因为在收集过程中不必处理它:
runBlocking {
instantCombine(a, b, c).collect {
println(it)
}
}
输出:
[a0]
[a1]
[a1,b0]
[a2,b0]
[a2,b1]
[a2,b1,c]
[a2,b2,c]
编辑:更新了答案,以处理也发出空值的流。
*使用的低级数组是线程安全的。就像您要处理单个变量一样。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句