Scala期货:不确定性输出

没有人

我是Scala的新手,我正在通过创建一些重试方案来练习Future库。这样做,我得到了以下代码:

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object Retries extends App {

  var retries = 0

  def resetRetries(): Unit = retries = 0

  def calc() = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException("This failed")
  }

  def fCalc(): Future[Int] = Future(calc())

  resetRetries()

  val ff = fCalc() // 0 - should fail
    .fallbackTo(fCalc()) // 1 - should fail
    .fallbackTo(fCalc()) // 2 - should fail
    .fallbackTo(fCalc()) // 3 - should fail
    .fallbackTo(fCalc()) // 4 - should be a success

  Await.ready(ff, 10.second)

  println(ff.isCompleted)
  println(ff.value)
}

每次我运行此代码时,都会得到不同的结果。我得到的结果样本如下

输出1

I am thread 12 This is going to fail. Retry count 1
I am thread 14 This is going to fail. Retry count 3
I am thread 13 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

输出2

I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 13 This is going to fail. Retry count 3
I am thread 14 This is going to fail. Retry count 4
true
Some(Success(10))

输出3

I am thread 12 This is going to fail. Retry count 1
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 12 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

结果并不总是在成功和失败之间交替出现的。在成功运行之前,可能会有不止几次失败的运行。

据我了解,应该只有4个日志“我是线程x,这将失败。重试计数x”,这些应该是以下内容:

I am thread a This is going to fail. Retry count 1
I am thread b This is going to fail. Retry count 2
I am thread c This is going to fail. Retry count 3
I am thread d This is going to fail. Retry count 4

不一定按此顺序进行-因为我不知道Scala线程模型是如何工作的-但您明白我的意思。但是,我得到的是这种不确定性的输出,我无法处理。所以...我的问题是:这种不确定性的输出是哪里来的?

我想提到以下重试机制始终产生相同的结果:

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object Retries extends App {

  var retries = 0

  def resetRetries(): Unit = retries = 0

  def calc() = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException("This failed")
  }

  def retry[T](op: => T)(retries: Int): Future[T] = Future(op) recoverWith { case _ if retries > 0 => retry(op)(retries - 1) }

  resetRetries()
  val retriableFuture: Future[Future[Int]] = retry(calc())(5)
  Await.ready(retriableFuture, 10 second)

  println(retriableFuture.isCompleted)
  println(retriableFuture.value)
}

输出量

I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Success(10))

如果我减少重试次数(retry(calc())(3)),结果将是失败的未来

I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))
SergGr

尽管从技术上说@Tim是正确的,但我认为他并没有真正回答这个问题。

我相信您困惑的真正根源是您对结构的误解:

f.fallbackTo(Future(calc()))

做。和它有什么不同

f.recoverWith({ case _ => Future(calc())})

有两个重要区别:

  1. 在该fallbackTo情况下,Future(calc())立即并因此(几乎)在立即开始执行calc()因此,原始未来和后备未来同时运行。recoverWith后备情况下,仅在原始将来失败后才创建将来。这种差异会影响记录顺序。同样,这意味着对的访问var retries是并发的,因此您可能会看到所有线程实际上由于retries丢失了某些更新而失败的情况

  2. 另一个棘手的一点是,fallbackTo记录为(高亮是矿)

创建一个新的future,如果成功完成,则保存该future的结果;如果未成功完成,则保存该future的结果。如果两个期货都失败,则所得的期货将持有第一个期货的可投掷对象

这种差异实际上并不会影响您的示例,因为在所有失败的尝试中抛出的异常是相同的,但是如果它们不同,则可能会影响结果。例如,如果您将代码修改为:

  def calc(attempt: Int) = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException(s"This failed $attempt")
  }

  def fCalc(attempt: Int): Future[Int] = Future(calc(attempt))

  val ff = fCalc(1) // 0 - should fail
      .fallbackTo(fCalc(2)) // 1 - should fail
      .fallbackTo(fCalc(3)) // 2 - should fail
      .fallbackTo(fCalc(4)) // 3 - should fail
      .fallbackTo(fCalc(5)) // 4 - should be a success

那么你应该得到这两个结果之一

Some(Failure(java.lang.IllegalArgumentException: This failed 1))
Some(Success(10))

从来没有其他“失败”的价值。

请注意,这里我明确地传递了,attempt以不打到竞态条件retries


回答更多评论(1月28日)

attempt在上一个示例中明确通过的原因是,这是确保IllegalArgumentException由逻辑上先创建的create在所有(甚至不是很现实的)线程调度下都calc1作为其值的最简单方法

如果您只想让所有日志具有不同的值,则有一种更简单的方法:使用局部变量!

  def calc() = {
    val retries = atomicRetries.getAndIncrement()
    if (retries > 3) 10 
    else {
      println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
      throw new IllegalArgumentException(s"This failed $retries")
    }
  }

这样,您可以避免经典的TOCTOU问题。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章