我读了这篇关于Akka Streams错误处理的文章
http://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-error.html
并写了这段代码。
val decider: Supervision.Decider = {
case _: Exception => Supervision.Restart
case _ => Supervision.Stop
}
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
val source = Source(1 to 10)
val flow = Flow[Int].map{x => if (x != 9) 2 * x else throw new Exception("9!")}
val sink : Sink[Int, Future[Done]] = Sink.foreach[Int](x => println(x))
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder => s =>
import GraphDSL.Implicits._
source ~> flow ~> s.in
ClosedShape
})
val future = graph.run()
future.onComplete{ _ =>
actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)
这工作得很好.....除了我需要扫描输出以查看未处理的行。我有办法打印/记录失败的行吗?[没有在我编写的每个流程中都放置明确的try / catch块吗?]
因此,例如,如果我正在使用actor(而不是流),那么我可能已经编写了actor的生命周期事件,并且可以记录actor重新启动时的记录以及重新启动时正在处理的消息。
但是这里我没有明确地使用actor(尽管它们是在内部使用的)。流/源/接收器是否存在生命周期事件?
只需对您的代码进行少量修改:
val decider: Supervision.Decider = {
case e: Exception =>
println("Exception handled, recovering stream:" + e.getMessage)
Supervision.Restart
case _ => Supervision.Stop
}
如果您将有意义的消息传递给流中的异常(例如,该行),则可以在监管决策器中将其打印出来。
我曾经println
给出一个简短的答案,但强烈建议您使用一些日志记录库,例如scala-logging
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句