使用Akka-IO TCP初始化Akka Actor

菲普龙

使用Akka-IO TCP,在actor中建立连接的过程如下:

class MyActor(remote: InetSocketAddress) extends Actor {

  IO(Tcp) ! Connect(remote)    //this is the first step, remote is the address to connect to

  def receive = {
    case CommandFailed(_: Connect) => context stop self // failed to connect

    case Connected(remote, local) =>
      val connection = sender()
      connection ! Register(self)
      // do cool things...  
  }
}

您向发送一条Connect消息,IO(Tcp)并希望收到CommandFailed或条Connected消息。

现在,我的目标是创建一个包装TCP连接的actor,但我希望我的actor仅在建立连接后才开始接受消息-否则,在等待Connected消息时它将开始接受查询但没有人发送他们去。

我试过的

class MyActor(address: InetSocketAddress) extends Actor {

  def receive = {
    case Initialize =>
      IO(Tcp) ! Connect(address)
      context.become(waitForConnection(sender()))

    case other => sender ! Status.Failure(new Exception(s"Connection to $address not established yet."))
  }

  private def waitForConnection(initializer: ActorRef): Receive = {
    case Connected(_, _) =>
      val connection = sender()
      connection ! Register(self)
      initializer ! Status.Success(Unit)
      // do cool things

    case CommandFailed(_: Connect) =>
      initializer ! Status.Failure(new Exception("Failed to connect to " + host))
      context stop self
  }
}

我的第一个receive期望是一个伪造的Initialize消息,它将触发整个连接过程,完成后,发件人将Initialize收到成功消息,并且知道它可以知道开始发送查询。

我对此不太满意,这迫使我用

val actor = system.actorOf(MyActor.props(remote))
Await.ready(actor ? Initialize, timeout)

而且它不会非常“重启”友好。

是否有任何想法可以保证演员在Tcp层回复之前不会开始从邮箱接收消息Connected

鲍勃·达格利什(Bob Dalgleish)

使用藏匿性状你现在不能办理藏匿消息。当每个过早的消息到达时,请使用stash()它来延迟它。连接打开后,使用unstashAll()可以将这些邮件返回到邮箱进行处理。然后,您可以使用become()切换到消息处理状态。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章