使用Akka-IO TCP,在actor中建立连接的过程如下:
class MyActor(remote: InetSocketAddress) extends Actor {
IO(Tcp) ! Connect(remote) //this is the first step, remote is the address to connect to
def receive = {
case CommandFailed(_: Connect) => context stop self // failed to connect
case Connected(remote, local) =>
val connection = sender()
connection ! Register(self)
// do cool things...
}
}
您向发送一条Connect
消息,IO(Tcp)
并希望收到CommandFailed
或条Connected
消息。
现在,我的目标是创建一个包装TCP连接的actor,但我希望我的actor仅在建立连接后才开始接受消息-否则,在等待Connected
消息时它将开始接受查询但没有人发送他们去。
我试过的
class MyActor(address: InetSocketAddress) extends Actor {
def receive = {
case Initialize =>
IO(Tcp) ! Connect(address)
context.become(waitForConnection(sender()))
case other => sender ! Status.Failure(new Exception(s"Connection to $address not established yet."))
}
private def waitForConnection(initializer: ActorRef): Receive = {
case Connected(_, _) =>
val connection = sender()
connection ! Register(self)
initializer ! Status.Success(Unit)
// do cool things
case CommandFailed(_: Connect) =>
initializer ! Status.Failure(new Exception("Failed to connect to " + host))
context stop self
}
}
我的第一个receive
期望是一个伪造的Initialize
消息,它将触发整个连接过程,完成后,发件人将Initialize
收到成功消息,并且知道它可以知道开始发送查询。
我对此不太满意,这迫使我用
val actor = system.actorOf(MyActor.props(remote))
Await.ready(actor ? Initialize, timeout)
而且它不会非常“重启”友好。
是否有任何想法可以保证演员在Tcp层回复之前不会开始从邮箱接收消息Connected
?
使用藏匿性状你现在不能办理藏匿消息。当每个过早的消息到达时,请使用stash()
它来延迟它。连接打开后,使用unstashAll()
可以将这些邮件返回到邮箱进行处理。然后,您可以使用become()
切换到消息处理状态。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句