在列表中合并多个Kotlin流,而无需等待第一个值

马克·普兰诺·莱赛

我有一个List<Flow<T>>,想生成一个Flow<List<T>>这几乎是combine什么的-除了Combine等待每个Flow发出初始值,这不是我想要的。以下面的代码为例:

val a = flow {
  repeat(3) {
    emit("a$it")
    delay(100)
  }
}
val b = flow {
  repeat(3) {
    delay(150)
    emit("b$it")
  }
}
val c = flow {
  delay(400)
  emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
  combine(flows) {
    it.toList()
  }.collect { println(it) }
}

使用combine(因此保持原样),这是输出:

[a2, b1, c]
[a2, b2, c]

而我也对所有中介步骤都感兴趣。这是我从这三个流程中想要的:

[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]

现在,我有两种解决方法,但是它们都不是很好的解决方案...第一个解决方案很丑陋,不适用于可空类型:

val flows = listOf(a, b, c).map {
  flow {
    emit(null)
    it.collect { emit(it) }
  }
}
runBlocking {
  combine(flows) {
    it.filterNotNull()
  }.collect { println(it) }
}

通过强制所有流发出第一个无关的值,combine确实调用转换器,并让我删除了我知道不是实际值的空值。对此进行迭代,使可读性更强:

sealed class FlowValueHolder {
  object None : FlowValueHolder()
  data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
  flow {
    emit(FlowValueHolder.None)
    it.collect { emit(FlowValueHolder.Some(it)) }
  }
}
runBlocking {
  combine(flows) {
    it.filterIsInstance(FlowValueHolder.Some::class.java)
      .map { it.value }
  }.collect { println(it) }
}

现在,这个程序可以正常工作,但是仍然感觉我在做过多事情。协程库中有我缺少的方法吗?

威利·曼策尔(Willi Mentzel)

这个怎么样:

inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
    val array= Array(flows.size) {
        false to (null as T?) // first element stands for "present"
    }

    flows.forEachIndexed { index, flow ->
        launch {
            flow.collect { emittedElement ->
                array[index] = true to emittedElement
                send(array.filter { it.first }.map { it.second })
            }
        }
    }
}

它解决了一些问题:

  • 无需引入新型
  • [] 不在结果流中
  • 从调用站点中抽象出空处理(或解决了该问题),结果流本身处理

因此,您不会注意到任何特定于实现的变通方法,因为在收集过程中不必处理它:

runBlocking {
    instantCombine(a, b, c).collect {
        println(it)
    }
}

输出:

[a0]
[a1]
[a1,b0]
[a2,b0]
[a2,b1]
[a2,b1,c]
[a2,b2,c]

在这里尝试!

编辑:更新了答案,以处理也发出空值的流。


*使用的低级数组是线程安全的。就像您要处理单个变量一样。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

按python中每个列表中的第一个值合并嵌套列表

当前一个流接收第一个值时 RxJS 合并

传递列表中更改的第一个值

如何获取列表中的第一个值

嵌套列表中的第一个值的总和

如何访问列表中的第一个值?

有什么方法可以清除/隐藏两个td表中的第一个td,而无需访问第一个td?

合并从第一个列表中的某个元素开始的列表列表

获取第一个函数调用匹配的返回值,而无需两次调用

如何在 Kotlin 中毫无例外地从列表中获取第一个值?

从另一个.bat调用多个.bat,而无需等待一个完成

合并 2 个列表以从第一个列表中删除重复项,同时保留第二个列表的对应值

Powershell 或 CMD - 我想同时运行两个命令而无需等待第一个命令完成

根据第一个元素合并列表

链接列表仅打印第一个列表的第一个值

使用合并从2个表中返回第一个非空值

选择第一个合并值,而不是全部

如何将JavaScript中的多个数组与第一个元素合并

返回第一个/主要活动,而无需重新加载它

将多个列表合并到流中的一个对象列表?

查找第一个值并返回列表中的第二个值

RxJava-当另一个流等待第一个项目时,如何缓冲流中的所有项目?

如何从java中的列表列表中获取第一个值

根据第一个中存在的值合并两列

熊猫-合并列A上的行,从列B,C等中获取第一个值

根据列表的最后一个值和第一个值在 python 中附加嵌套列表

尝试从列表的第一个值计算列表中的差异

使用.GroupBy删除列表重复项,并保留第一个列表中的值

在PySpark列的列表列表中获取第一个元素的最大值