我对Scala和Spark都是陌生的,所以我希望有人可以解释为什么当gregationByKey在抽象类中时无法编译。这是我可以想到的最简单的示例:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
abstract class AbstractKeyCounter[K] {
def keyValPairs(): RDD[(K, String)]
def processData(): RDD[(K, Int)] = {
keyValPairs().aggregateByKey(0)(
(count, key) => count + 1,
(count1, count2) => count1 + count2
)
}
}
class StringKeyCounter extends AbstractKeyCounter[String] {
override def keyValPairs(): RDD[(String, String)] = {
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("counter"))
val data = sc.parallelize(Array("foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D"))
data.map(_.split("=")).map(v => (v(0), v(1)))
}
}
这使:
Error:(11, 19) value aggregateByKey is not a member of org.apache.spark.rdd.RDD[(K, String)]
keyValPairs().aggregateByKey(0)(
^
如果我改用一个具体的类,它将编译并成功运行:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
class StringKeyCounter {
def processData(): RDD[(String, Int)] = {
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("counter"))
val data = sc.parallelize(Array("foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D"))
val keyValPairs = data.map(_.split("=")).map(v => (v(0), v(1)))
keyValPairs.aggregateByKey(0)(
(count, key) => count + 1,
(count1, count2) => count1 + count2
)
}
}
我想念什么?
如果您更改:
abstract class AbstractKeyCounter[K] {
至:
abstract class AbstractKeyCounter[K : ClassTag] {
这将编译。
为什么呢 aggregateByKey
是的方法PairRDDFunctions
(您RDD
被隐式转换为该类),该方法具有以下签名:
class PairRDDFunctions[K, V](self: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
这意味着其构造函数期望类型为和的隐式值。您的抽象类不知道什么是K,因此无法提供匹配的隐式值。这意味着隐式转换为“失败”(编译器不执行转换),因此找不到该方法。ClassTag[K]
vt: ClassTag[V]
PairRDDFunctions
aggregateByKey
Adding[K : ClassTag]
是implicit kt: ClassTag[K]
向抽象类构造函数添加隐式参数的简写,然后由编译器使用并将其传递给的构造函数PairRDDFunctions
。
有关ClassTag及其优点的更多信息,请参阅这篇好文章。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句