我正在尝试创建Flow
要与Source
队列一起使用的。我希望将其与Alpakka Google PubSub连接器一起使用:https ://doc.akka.io/docs/alpakka/current/google-cloud-pub-sub.html
为了使用此连接器,我需要创建一个Flow
取决于,作为主题名称提供的String
,如上面的链接和代码片段所示。
val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
GooglePubSub.publish(topic, config)
我希望能够设置一个Source
队列,以接收发布消息所需的主题和消息。我首先PublishRequest
在消息中创建必要的内容String
。然后,我想Flow
通过运行实例化该对象GooglePubSub.publish(topic, config)
。但是,我不知道如何使话题进入流程的那一部分。
val gcFlow: Flow[(String, String), PublishRequest, NotUsed] = Flow[(String, String)]
.map(messageData => {
PublishRequest(Seq(
PubSubMessage(new String(Base64.getEncoder.encode(messageData._1.getBytes))))
)
})
.via(GooglePubSub.publish(topic, config))
val bufferSize = 10
val elementsToProcess = 5
// newSource is a Source[PublishRequest, NotUsed]
val (queue, newSource) = Source
.queue[(String, String)](bufferSize, OverflowStrategy.backpressure)
.via(gcFlow)
.preMaterialize()
我不确定是否有一种方法可以将主题放入队列而不将其作为初始数据流的一部分。而且我不知道如何将流值转化为动态值Flow
。
如果我不正确地使用了一些术语,请记住我是新手。
您可以通过使用flatMapConcat
并Source
在其中生成新的代码来实现它:
// using tuple assuming (Topic, Message)
val gcFlow: Flow[(String, String), (String, PublishRequest), NotUsed] = Flow[(String, String)]
.map(messageData => {
val pr = PublishRequest(immutable.Seq(
PubSubMessage(new String(Base64.getEncoder.encode(messageData._2.getBytes)))))
// output flow shape of (String, PublishRequest)
(messageData._1, pr)
})
val publishFlow: Flow[(String, PublishRequest), Seq[String], NotUsed] =
Flow[(String, PublishRequest)].flatMapConcat {
case (topic: String, pr: PublishRequest) =>
// Create a Source[PublishRequest]
Source.single(pr).via(GooglePubSub.publish(topic, config))
}
// wire it up
val (queue, newSource) = Source
.queue[(String, String)](bufferSize, OverflowStrategy.backpressure)
.via(gcFlow)
.via(publishFlow)
.preMaterialize()
(可选)您可以用案例类代替元组以更好地记录它
case class Something(topic: String, payload: PublishRequest)
// output flow shape of Something[String, PublishRequest]
Something(messageData._1, pr)
Flow[Something[String, PublishRequest]].flatMapConcat { s =>
Source.single(s.payload)... // etc
}
说明:
在gcFlow
我们输出FlowShape元组的(String, PublishRequest)
其中穿过publishFlow
。输入是元组,(String, PublishRequest)
并且flatMapConcat
我们生成新Source[PublishRequest]
的流经GooglePubSub.publish
为每个项目创建新的源将有少量开销。这不应对性能产生可衡量的影响
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句