是否有Akka流组合器来执行以下操作(或达到此目的的某些操作)?(and
现在叫它。)
(flow1: Flow[I, O, Mat]).and[O2](flow2: Flow[I, O2, Mat]): Flow[I, (O, O2), Mat]
语义是无论源如何,其元素都将传递给两个Flow
,并且它们的输出将组合成一个新Flow
的元组。(对于那些熟悉类别理论风格的函数式编程的人,我正在寻找类似的东西&&&
。)
库中有两个看起来很相关的组合器,分别是zip
和alsoTo
。但是前者接受a SourceShape
,而后者接受a SinkShape
。都不会承认GraphShape
。为什么会这样呢?
我的用例如下:
someSource
.via(someFlowThatReturnsUnit.and(Flow.apply))
.runWith(someSink)
找不到类似的内容.and
,因此我修改了原来的内容Flow
:
someSource
.via(someFlowThatDoesWhateverItWasDoingEarlierButNowAlsoEmitsInputsAsIs)
.runWith(someSink)
这行得通,但是我正在寻找更清洁,更合成的解决方案。
注意
正如Viktor Klang在评论中指出的那样:Tuple2[O,O2]
仅当已知两个流flow1
&flow2
相对于输入元素计数和输出元素计数为1:1时,才能压缩到a中才可行。
基于图的解决方案
可以在Graph内部创建一个元组构造。实际上,您的问题几乎完全符合以下示例:
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.ignore
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Zip[Int, Int]()) //different than link
val f1, f2, f4 = Flow[Int].map(_ + 10)
val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
ClosedShape
})//end RunnableGraph.fromGraph
有点hacky流解决方案
如果您正在寻找纯流解决方案,则可以使用中间流,但Mat
不会被保留,并且涉及每个输入元素2个流的实现:
def andFlows[I, O, O2] (maxConcurrentSreams : Int)
(flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
(implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] =
Flow[I].mapAsync(maxConcurrentStreams){ i =>
val o : Future[O] = Source
.single(i)
.via(flow1)
.to(Sink.head[O])
.run()
val o2 : Future[O2] = Source
.single(i)
.via(flow2)
.to(Sink.head[O2])
.run()
o zip o2
}//end Flow[I].mapAsync
通用压缩
如果要使这种压缩通用,则对于大多数Flows而言,输出类型必须为(Seq[O], Seq[O2])
。可以使用Sink.seq
代替Sink.head
在上面的andFlows
函数中生成此类型:
def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int)
(flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
(implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] =
Flow[I].mapAsync(maxConcurrentStreams){ i =>
val o : Future[Seq[O]] = Source
.single(i)
.via(flow1)
.to(Sink.seq[O])
.run()
val o2 : Future[Seq[O2]] = Source
.single(i)
.via(flow2)
.to(Sink.seq[O2])
.run()
o zip o2
}//end Flow[I].mapAsync
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句