从 Akka(类型化)Actor 内部处理 Futures 的正确方法是什么?
例如,假设有一个 ActorOrderActor
接收命令来下订单……它通过对外部服务进行 http 调用来实现。由于这些是对外部服务的 http 调用,Future
因此涉及到 s。那么,Future
从 Actor 内部处理该问题的正确方法是什么。
我读了一些关于 pipeTo 模式的东西。这是这里需要发生的事情还是其他事情?
class OrderActor(context: ActorContext[OrderCommand], orderFacade: OrderFacade)
extends AbstractBehavior[OrderCommand](context) {
context.log.info("Order Actor started")
override def onMessage(msg: OrderCommand): Behavior[OrderCommand] = {
msg match {
case PlaceOrder(
referenceId: OrderReferenceId,
ticker: Ticker,
quantity: Int,
replyTo: ActorRef[OrderResult]
) =>
orderFacade
.placeOrder(ticker, quantity) //this returns a Future
.map(res => {
//transform result
//book keeping / notification (affects state)
replyTo ! transformed
//Can/Should we map like this? I tried adding a log statement in here, but I never see it... and the replyTo doesnt seem to get the message.
})
this
通常最好避免在actor 内部进行Future
转换(map
、flatMap
、等)。foreach
当转换运行时,actor 中的某些可变状态不是您所期望的,这是一个明显的风险。在 Akka Classic 中,这种情况最有害的形式可能会导致向错误的 actor 发送回复。
Akka Typed(特别是在函数式 API 中)减少了许多可能导致麻烦的可变状态,但通常将其Future
作为消息传递给 actor 仍然是一个好主意。
所以如果orderFacade.placeOrder
结果是 a Future[OrderResponse]
,你可以添加这样的子OrderCommand
类
// also include fields from the PlaceOrder which will be useful
case class OrderResponseIs(resp: OrderResponse, replyTo: ActorRef[OrderResult]) extends OrderCommand
// TODO include fields
case class OrderFailed() extends OrderCommand
然后用管道Future
传递给自己:
import scala.util.{ Failure, Success }
context.pipeToSelf(orderFacade.placeOrder) {
case Success(resp) => OrderResponseIs(resp, replyTo)
case Failure(_) => OrderFailed()
}
然后,您必须处理这些消息:
case OrderResponseIs(resp, replyTo) =>
// transform resp
val transformed = ???
replyTo ! transformed
this
case OrderFailed() =>
context.log.warning("Stuff is broken")
this
与朋友相比,这样做实际上并没有太多开销map
(两者通常都涉及调度任务以在调度程序上异步执行)。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句