如何在Akka流中使用值来实例化GooglePubSub流?

雅各布·古德温

我正在尝试创建Flow要与Source队列一起使用的我希望将其与Alpakka Google PubSub连接器一起使用:https ://doc.akka.io/docs/alpakka/current/google-cloud-pub-sub.html

为了使用此连接器,我需要创建一个Flow取决于,作为主题名称提供的String,如上面的链接和代码片段所示。

val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
  GooglePubSub.publish(topic, config)

问题

我希望能够设置一个Source队列,以接收发布消息所需的主题和消息。我首先PublishRequest在消息中创建必要的内容String然后,我想Flow通过运行实例化该对象GooglePubSub.publish(topic, config)但是,我不知道如何使话题进入流程的那一部分。

val gcFlow: Flow[(String, String), PublishRequest, NotUsed] = Flow[(String, String)]
  .map(messageData => {
    PublishRequest(Seq(
      PubSubMessage(new String(Base64.getEncoder.encode(messageData._1.getBytes))))
      )
    })
  .via(GooglePubSub.publish(topic, config))

val bufferSize = 10
val elementsToProcess = 5

// newSource is a Source[PublishRequest, NotUsed]
val (queue, newSource) = Source
  .queue[(String, String)](bufferSize, OverflowStrategy.backpressure)
  .via(gcFlow)
  .preMaterialize()

我不确定是否有一种方法可以将主题放入队列而不将其作为初始数据流的一部分。而且我不知道如何将流值转化为动态值Flow

如果我不正确地使用了一些术语,请记住我是新手。

1565986223

您可以通过使用flatMapConcatSource在其中生成新的代码来实现它:

// using tuple assuming (Topic, Message)
val gcFlow: Flow[(String, String), (String, PublishRequest), NotUsed] = Flow[(String, String)]
    .map(messageData => {
      val pr = PublishRequest(immutable.Seq(
        PubSubMessage(new String(Base64.getEncoder.encode(messageData._2.getBytes)))))
      // output flow shape of (String, PublishRequest)
      (messageData._1, pr)
    })

val publishFlow: Flow[(String, PublishRequest), Seq[String], NotUsed] =
Flow[(String, PublishRequest)].flatMapConcat {
    case (topic: String, pr: PublishRequest) =>
      // Create a Source[PublishRequest]
      Source.single(pr).via(GooglePubSub.publish(topic, config))
  }

// wire it up
val (queue, newSource) = Source
    .queue[(String, String)](bufferSize, OverflowStrategy.backpressure)
    .via(gcFlow)
    .via(publishFlow)
    .preMaterialize()

(可选)您可以用案例类代替元组以更好地记录

case class Something(topic: String, payload: PublishRequest)

// output flow shape of Something[String, PublishRequest]
Something(messageData._1, pr)

Flow[Something[String, PublishRequest]].flatMapConcat { s =>
  Source.single(s.payload)... // etc
}

说明:

gcFlow我们输出FlowShape元组的(String, PublishRequest)其中穿过publishFlow输入是元组,(String, PublishRequest)并且flatMapConcat我们生成新Source[PublishRequest]的流经GooglePubSub.publish

为每个项目创建新的源将有少量开销。这不应对性能产生可衡量的影响

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章