我是Scala的新手,我正在通过创建一些重试方案来练习Future库。这样做,我得到了以下代码:
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
object Retries extends App {
var retries = 0
def resetRetries(): Unit = retries = 0
def calc() = if (retries > 3) 10 else {
retries += 1
println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
throw new IllegalArgumentException("This failed")
}
def fCalc(): Future[Int] = Future(calc())
resetRetries()
val ff = fCalc() // 0 - should fail
.fallbackTo(fCalc()) // 1 - should fail
.fallbackTo(fCalc()) // 2 - should fail
.fallbackTo(fCalc()) // 3 - should fail
.fallbackTo(fCalc()) // 4 - should be a success
Await.ready(ff, 10.second)
println(ff.isCompleted)
println(ff.value)
}
每次我运行此代码时,都会得到不同的结果。我得到的结果样本如下
输出1
I am thread 12 This is going to fail. Retry count 1
I am thread 14 This is going to fail. Retry count 3
I am thread 13 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))
输出2
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 13 This is going to fail. Retry count 3
I am thread 14 This is going to fail. Retry count 4
true
Some(Success(10))
输出3
I am thread 12 This is going to fail. Retry count 1
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 12 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))
结果并不总是在成功和失败之间交替出现的。在成功运行之前,可能会有不止几次失败的运行。
据我了解,应该只有4个日志“我是线程x,这将失败。重试计数x”,这些应该是以下内容:
I am thread a This is going to fail. Retry count 1
I am thread b This is going to fail. Retry count 2
I am thread c This is going to fail. Retry count 3
I am thread d This is going to fail. Retry count 4
不一定按此顺序进行-因为我不知道Scala线程模型是如何工作的-但您明白我的意思。但是,我得到的是这种不确定性的输出,我无法处理。所以...我的问题是:这种不确定性的输出是哪里来的?
我想提到以下重试机制始终产生相同的结果:
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
object Retries extends App {
var retries = 0
def resetRetries(): Unit = retries = 0
def calc() = if (retries > 3) 10 else {
retries += 1
println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
throw new IllegalArgumentException("This failed")
}
def retry[T](op: => T)(retries: Int): Future[T] = Future(op) recoverWith { case _ if retries > 0 => retry(op)(retries - 1) }
resetRetries()
val retriableFuture: Future[Future[Int]] = retry(calc())(5)
Await.ready(retriableFuture, 10 second)
println(retriableFuture.isCompleted)
println(retriableFuture.value)
}
输出量
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Success(10))
如果我减少重试次数(retry(calc())(3)
),结果将是失败的未来
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))
尽管从技术上说@Tim是正确的,但我认为他并没有真正回答这个问题。
我相信您困惑的真正根源是您对结构的误解:
f.fallbackTo(Future(calc()))
做。和它有什么不同
f.recoverWith({ case _ => Future(calc())})
有两个重要区别:
在该fallbackTo
情况下,Future(calc())
立即并因此(几乎)在立即开始执行calc()
。因此,原始未来和后备未来同时运行。在recoverWith
后备情况下,仅在原始将来失败后才创建将来。这种差异会影响记录顺序。同样,这意味着对的访问var retries
是并发的,因此您可能会看到所有线程实际上由于retries
丢失了某些更新而失败的情况。
另一个棘手的一点是,fallbackTo
被记录为(高亮是矿)
创建一个新的future,如果成功完成,则保存该future的结果;如果未成功完成,则保存该future的结果。如果两个期货都失败,则所得的期货将持有第一个期货的可投掷对象。
这种差异实际上并不会影响您的示例,因为在所有失败的尝试中抛出的异常是相同的,但是如果它们不同,则可能会影响结果。例如,如果您将代码修改为:
def calc(attempt: Int) = if (retries > 3) 10 else {
retries += 1
println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
throw new IllegalArgumentException(s"This failed $attempt")
}
def fCalc(attempt: Int): Future[Int] = Future(calc(attempt))
val ff = fCalc(1) // 0 - should fail
.fallbackTo(fCalc(2)) // 1 - should fail
.fallbackTo(fCalc(3)) // 2 - should fail
.fallbackTo(fCalc(4)) // 3 - should fail
.fallbackTo(fCalc(5)) // 4 - should be a success
那么你应该得到这两个结果之一
Some(Failure(java.lang.IllegalArgumentException: This failed 1))
Some(Success(10))
从来没有其他“失败”的价值。
请注意,这里我明确地传递了,attempt
以不打到竞态条件retries
。
回答更多评论(1月28日)
我attempt
在上一个示例中明确通过的原因是,这是确保IllegalArgumentException
由逻辑上先创建的create在所有(甚至不是很现实的)线程调度下都calc
将1
作为其值的最简单方法。
如果您只想让所有日志具有不同的值,则有一种更简单的方法:使用局部变量!
def calc() = {
val retries = atomicRetries.getAndIncrement()
if (retries > 3) 10
else {
println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
throw new IllegalArgumentException(s"This failed $retries")
}
}
这样,您可以避免经典的TOCTOU问题。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句