RDD CountClose花费的时间远远超过请求的超时时间

javadba

为了减少花在收集上的时间count正在调用DataFrame该行RDD.countApproximate()它具有以下签名:

def countApprox(
    timeout: Long,
    confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {

我试图将输出计算限制为60秒。还要注意以下非常低的精度要求0.10

val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).getFinalValue.mean

但是实际时间是.. 17分钟

在此处输入图片说明

该时间几乎与最初生成数据所需的时间相同(19分钟)!

那么,那么-该API用途什么:有没有办法让它实际保存准确时间计算中的有意义的一部分

TL; DR(请参阅接受的答案):使用initialValue代替getFinalValue

哈杜普

请注意approxCount定义中的返回类型这是部分结果。

def countApprox(
    timeout: Long,
    confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {

现在,请注意其用法:

val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).**getFinalValue**.mean

根据spark scala doc的说法,它getFinalValue是一种阻止方法,这意味着它将等待完整的操作完成。

initialValue可以在指定的超时时间内获取。因此,以下代码段在超时后不会阻止进一步的操作,

val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).initialValue.mean

请注意,using的缺点countApprox(timeout, confidence).initialValue是即使获得该值,它也会继续计数,直到获得您将要使用的最终计数,getFinalValue并且仍将保留资源,直到操作完成。

现在,此API的使用不会在计数操作时被阻塞。

参考:https : //mail-archives.apache.org/mod_mbox/spark-user/201505.mbox/%[email protected]%3E

现在,让我们验证我们对spark2-shell进行非阻塞操作的假设。让我们创建随机数据框中并执行countapproxCountgetFinalValueapproxCountinitialValue

scala> val schema = StructType((0 to 10).map(n => StructField(s"column_$n", StringType)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(column_0,StringType,true), StructField(column_1,StringType,true), StructField(column_2,StringType,true), StructField(column_3,StringType,true), StructField(column_4,StringType,true), StructField(column_5,StringType,true), StructField(column_6,StringType,true), StructField(column_7,StringType,true), StructField(column_8,StringType,true), StructField(column_9,StringType,true), StructField(column_10,StringType,true))

scala> val rows = spark.sparkContext.parallelize(Seq[Row](), 100).mapPartitions { _ => { Range(0, 100000).map(m => Row(schema.map(_ => Random.alphanumeric.filter(_.isLower).head.toString).toList: _*)).iterator } }
rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[1] at mapPartitions at <console>:32

scala> val inputDf = spark.sqlContext.createDataFrame(rows, schema)
inputDf: org.apache.spark.sql.DataFrame = [column_0: string, column_1: string ... 9 more fields]

//Please note that cnt will be displayed only when all tasks are completed
scala> val cnt = inputDf.rdd.count
cnt: Long = 10000000

scala> val waitSecs = 60
waitSecs: Int = 60

//cntApproxFinal will be displayed only when all tasks are completed.
scala> val cntApprxFinal = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).getFinalValue.mean
[Stage 1:======================================================> (98 + 2) / 100]cntApprxFinal: Double = 1.0E7

scala> val waitSecs = 60
waitSecs: Int = 60

//Please note that cntApprxInitila in this case, will be displayed exactly after timeout duration. In this case 80 tasks were completed within timeout and it displayed the value of variable. Even after displaying the variable value, it continued will all the remaining tasks
scala> val cntApprxInitial = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).initialValue.mean
[Stage 2:============================================>           (80 + 4) / 100]cntApprxInitial: Double = 1.0E7

[Stage 2:=======================================================>(99 + 1) / 100]

让我们看一下spark ui和spark-shell,所有3个操作都花了相同的时间: 所有3个操作都花费了相同的时间

cntApprxInitial 在完成所有任务之前可用。

cntApprxInitial在完成所有任务之前可用

希望这可以帮助!

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章