如何等待SparkContext完成所有过程?

费利佩

我如何查看SparkContext是否有正在执行的内容,何时一切都停止了?因为当前我正在等待30秒才能调用SparkContext.stop,否则我的应用程序将引发错误。

import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkContext

object RatingsCounter extends App {

  // set the log level to print only errors
  Logger.getLogger("org").setLevel(Level.ERROR)

  // create a SparkContext using every core of the local machine, named RatingsCounter
  val sc = new SparkContext("local[*]", "RatingsCounter")

  // load up each line of the ratings data into an RDD (Resilient Distributed Dataset)
  val lines = sc.textFile("src/main/resource/u.data", 0)

  // convert each line to s string, split it out by tabs and extract the third field.
  // The file format is userID, movieID, rating, timestamp
  val ratings = lines.map(x => x.toString().split("\t")(2))

  // count up how many times each value occurs
  val results = ratings.countByValue()

  // sort the resulting map of (rating, count) tuples
  val sortedResults = results.toSeq.sortBy(_._1)

  // print each result on its own line.
  sortedResults.foreach { case (key, value) => println("movie ID: " + key + " - rating times: " + value) }

  Thread.sleep(30000)

  sc.stop()
}
以利亚

Spark应用程序应定义一个main()方法而不要扩展scala.App的子类scala.App可能无法正常工作。

而且,由于您正在扩展App,因此您将获得意外的行为。

您可以在有关自包含应用程序的官方文档中阅读有关它的更多信息

App使用DelayedInit并可能导致初始化问题。使用主要方法,您知道发生了什么。摘自reddit。

object HelloWorld extends App {
  var a = 1
  a + 1
  override def main(args: Array[String]) {
    println(a) // guess what's the value of a ?
  }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章