我正在尝试编写一个自定义Akka SnapshotStore 插件。
我正处于要实现此方法的地步:
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
这是我到目前为止所拥有的:
import cats.data.OptionT
import cats.implicits._
...
override def loadAsync(
persistenceId: String,
criteria: SnapshotSelectionCriteria
): Future[Option[SelectedSnapshot]] = {
// same as in the original plugin
val metadata = snapshotMetadatas(persistenceId, criteria).sorted.takeRight(maxLoadAttempts)
// need to get rid of this one!
import scala.concurrent.ExecutionContext.Implicits.global
val getSnapshotAndReportMetric = for {
snapshot <- OptionT.fromOption[Future](getMaybeSnapshotFromMetadata(metadata))
_ <- OptionT.liftF(Future {
observabilityService
.recordMetric(
LongCounterMetric(readVehicleSnapshotFromDiskCounter, SnapShotDirectoryScannerCommandOptions)
)
}(directorySnapshotScanningDispatcher))
} yield snapshot
getSnapshotAndReportMetric.value.recoverWith {
// retry if we listed an older snapshot that was deleted before loading
case _: NoSuchFileException => loadAsync(persistenceId, criteria)
}(streamDispatcher)
}
供参考的是 的签名getMaybeSnapshotFromMetadata
。它的签名可以修改为添加一个Future
,但最终包装的响应必须是 type Option[SelectedSnapshot]
。
private def getMaybeSnapshotFromMetadata(metadata: Seq[SnapshotMetadata]): Option[SelectedSnapshot]
代码 as is 编译,但只是因为我导入了隐式 global ExecutionContext
。我的目标是使用不同的显式执行上下文(不同的可配置调度程序),但我不知道如何getMaybeSnapshotFromMetadata
在 for-comprehensions 中的第一行(调用 )。如果我使用OptionT.liftF
,那么我可以做到,例如
...
snapshot <- OptionT.liftF( Future { getMaybeSnapshotFromMetadata(metadata)}(streamDispatcher))
...但后来我得到了OptionT[Future, Option[SelectedSnapshot]
一个结果。
我想要实现的目标有解决方案吗?如果不是,我可以只使用Future
s 和它的andThen
链方法:
Future {
getMaybeSnapshotFromMetadata(metadata)
}(streamDispatcher)
.andThen(selectedSnapshot => {
observabilityService
.recordMetric(
LongCounterMetric(readVehicleSnapshotFromDiskCounter, SnapShotDirectoryScannerCommandOptions)
)
selectedSnapshot
})(opentelemetryDispatcher)
.recoverWith {
// retry if we listed an older snapshot that was deleted before loading
case _: NoSuchFileException => loadAsync(persistenceId, criteria)
}(streamDispatcher)
更新对于理解liftF
中的第二行实际上是一个可行的解决方案 - 我更新了代码块。
不要过度使用猫。对于某些事情来说,它是一个很好的工具,但是如果使用不当,它只会增加复杂性并损害可读性,而没有任何好处。
Future(getMaybeSnapshotFromMetadata(metadata))(streamDispatcher)
.andThen { case _ =>
observabilityService.recordWiseMetric(...)
}(directorySnapshotScanningDispatcher)
.recoverWith(...)(streamDispatcher)
这相当于您的代码没有所有复杂性......这不是“解决方法”,而是编写此代码的实际正确方法。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句