为什么我不能在Spark的mapPartitions中使用foreach

DuFei

SparkContext使用2个分区创建了一个数组,我也尝试使用它mapPartition来处理元素,但是当我这样编写代码时,我遇到了一个非常奇怪的错误:

val masterURL = "local[*]"

val conf = new SparkConf().setAppName("KMeans Test").setMaster(masterURL)
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")

val data = sc.textFile("file:/d:/data/kmeans_data.txt")
val parsedData = data.mapPartitions(partition => parseData(partition)).cache()

parsedData.mapPartitions(points =>
  points.map(point =>
    println(point)
  )
)

它没有错误,但是,当我将map替换为foreach时,它提示一个错误:

parsedData.mapPartitions(points =>
  points.foreach(point =>
   println(point)
  )
)

错误如下:

类型不匹配,预期:(​​Iterator [Vector])=> Iterator [NotInferedU],实际:(Iterator [Vector])=> Unit类型的Unit表达式不符合预期的Iterator [U_]类型

另外,第一个代码片段也无法在控制台面板中打印任何内容,为什么?

拉德万·切巴恩

mapPartitions期望有一个函数返回一个新的分区迭代器(Iterator[Vector] => Iterator[NotInferedU]),它将一个迭代器映射到另一个迭代器。通过使用foreach,您返回的空值(Scala中的Unit)与预期的返回类型不同。

要打印RDD内容,可以使用foreachPartition代替mapPartitions

parsedData.foreachPartition(points =>
  points.foreach(point =>
    println(point)
  )
)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

为什么我不能在 Spark 中使用 combineByKey?

为什么我不能在我的代码中使用“ balance ”?

为什么我不能在Java枚举上使用foreach?

为什么我不能在开关案例中使用枚举?

为什么我不能在Swift中使用'object == nil'?

为什么我不能在CSS Variable中使用rgba?

为什么我不能在界面中使用默认方法?

为什么我不能在匿名类中使用<Class> .this?

为什么我不能在TRecord中使用Variable?

为什么我不能在列表中使用匿名函数?

为什么我不能在Laravel中使用关系方法?

为什么我不能在Swift中使用let in协议?

为什么我不能在Flutter中使用某些图标?

为什么不能在我的Magic Square程序中使用

为什么我不能在Activity中使用FloatingActionButton

为什么我不能在Crontab中使用Python 3?

为什么我不能在grep中使用^ \ s?

为什么我不能在 observable 中使用 switchMap

为什么我不能在 useEffect 中使用 dispatch a thunk?

为什么我不能在blazor中使用JSInterop?

为什么我不能在Python中使用“ +”合并字典?

为什么我不能在App Component中使用MyContext?

为什么我不能在JCreator中使用JavaFX?

为什么我不能在Xampp中使用htaccess?

为什么我不能在Matlab中使用函数readframe

为什么我不能在输入中使用空格字符?

为什么我不能在 yyerror (Yacc) 中使用 yytext

为什么我不能在 django 中使用模板标签

为什么我不能在 Blazor 中使用输入值?