无法使用 spark-atlas-connector 设置 spark 应用程序

戴夫

无法通过 spark-atlas-connector 使用 apache atlas 设置我的 spark 应用程序。

我克隆了https://github.com/hortonworks-spark/spark-atlas-connector项目并执行了mvn package然后我将所有 jars 放在我的项目中并像这样设置配置:

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
      .setAppName("atlas-test")
      .setMaster("local[2]")
      .set("spark.extraListeners", "com.hortonworks.spark.atlas.SparkAtlasEventTracker")
      .set("spark.sql.queryExecutionListeners", "com.hortonworks.spark.atlas.SparkAtlasEventTracker")
      .set("spark.sql.streaming.streamingQueryListeners", "com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker")

    val spark = SparkSession.builder()
      .config(sparkConf)
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._


    val df = spark.read.format("kafka")
      .option("kafka.bootstrap.servers", BROKER_SERVERS)
      .option("subscribe", "foobar")
      .option("startingOffset", "earliest")
      .load()

    df.show()

    df.write
      .format("kafka")
      .option("kafka.bootstrap.servers", BROKER_SERVERS)
      .option("topic", "foobar-out")
      .save()

  }

Atlas是通过我拉的 docker 容器启动的。带有 Zookeper 的 Kafka是通过我也拉的docker容器盯着的。

这项工作无需 spark-atlas-connector 即可工作,但是当我想添加连接器时,它会引发异常。

19/08/09 16:40:16 ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Exception when registering SparkListener
    at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:2398)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:555)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
    at Boot$.main(Boot.scala:21)
    at Boot.main(Boot.scala)
Caused by: org.apache.atlas.AtlasException: Failed to load application properties
    at org.apache.atlas.ApplicationProperties.get(ApplicationProperties.java:134)
    at org.apache.atlas.ApplicationProperties.get(ApplicationProperties.java:86)
    at com.hortonworks.spark.atlas.AtlasClientConf.configuration$lzycompute(AtlasClientConf.scala:25)
    at com.hortonworks.spark.atlas.AtlasClientConf.configuration(AtlasClientConf.scala:25)
    at com.hortonworks.spark.atlas.AtlasClientConf.get(AtlasClientConf.scala:50)
    at com.hortonworks.spark.atlas.AtlasClient$.atlasClient(AtlasClient.scala:120)
    at com.hortonworks.spark.atlas.SparkAtlasEventTracker.<init>(SparkAtlasEventTracker.scala:33)
    at com.hortonworks.spark.atlas.SparkAtlasEventTracker.<init>(SparkAtlasEventTracker.scala:37)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.spark.util.Utils$$anonfun$loadExtensions$1.apply(Utils.scala:2691)
    at org.apache.spark.util.Utils$$anonfun$loadExtensions$1.apply(Utils.scala:2680)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2680)
    at org.apache.spark.SparkContext$$anonfun$setupAndStartListenerBus$1.apply(SparkContext.scala:2387)
    at org.apache.spark.SparkContext$$anonfun$setupAndStartListenerBus$1.apply(SparkContext.scala:2386)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:2386)
    ... 8 more
Caused by: com.hortonworks.spark.atlas.shade.org.apache.commons.configuration.ConfigurationException: Cannot locate configuration source null
    at com.hortonworks.spark.atlas.shade.org.apache.commons.configuration.AbstractFileConfiguration.load(AbstractFileConfiguration.java:259)
    at com.hortonworks.spark.atlas.shade.org.apache.commons.configuration.AbstractFileConfiguration.load(AbstractFileConfiguration.java:238)
    at com.hortonworks.spark.atlas.shade.org.apache.commons.configuration.AbstractFileConfiguration.<init>(AbstractFileConfiguration.java:197)
    at com.hortonworks.spark.atlas.shade.org.apache.commons.configuration.PropertiesConfiguration.<init>(PropertiesConfiguration.java:284)
    at org.apache.atlas.ApplicationProperties.<init>(ApplicationProperties.java:69)
    at org.apache.atlas.ApplicationProperties.get(ApplicationProperties.java:125)
    ... 32 more
19/08/09 16:40:16 INFO SparkContext: SparkContext already stopped.
Exception in thread "main" org.apache.spark.SparkException: Exception when registering SparkListener
    at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:2398)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:555)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
    at Boot$.main(Boot.scala:21)
    at Boot.main(Boot.scala)
Caused by: org.apache.atlas.AtlasException: Failed to load application properties
    at org.apache.atlas.ApplicationProperties.get(ApplicationProperties.java:134)
    at org.apache.atlas.ApplicationProperties.get(ApplicationProperties.java:86)
    at com.hortonworks.spark.atlas.AtlasClientConf.configuration$lzycompute(AtlasClientConf.scala:25)
    at com.hortonworks.spark.atlas.AtlasClientConf.configuration(AtlasClientConf.scala:25)
    at com.hortonworks.spark.atlas.AtlasClientConf.get(AtlasClientConf.scala:50)
    at com.hortonworks.spark.atlas.AtlasClient$.atlasClient(AtlasClient.scala:120)
    at com.hortonworks.spark.atlas.SparkAtlasEventTracker.<init>(SparkAtlasEventTracker.scala:33)
    at com.hortonworks.spark.atlas.SparkAtlasEventTracker.<init>(SparkAtlasEventTracker.scala:37)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.spark.util.Utils$$anonfun$loadExtensions$1.apply(Utils.scala:2691)
    at org.apache.spark.util.Utils$$anonfun$loadExtensions$1.apply(Utils.scala:2680)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2680)
    at org.apache.spark.SparkContext$$anonfun$setupAndStartListenerBus$1.apply(SparkContext.scala:2387)
    at org.apache.spark.SparkContext$$anonfun$setupAndStartListenerBus$1.apply(SparkContext.scala:2386)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:2386)
    ... 8 more
Caused by: com.hortonworks.spark.atlas.shade.org.apache.commons.configuration.ConfigurationException: Cannot locate configuration source null
    at com.hortonworks.spark.atlas.shade.org.apache.commons.configuration.AbstractFileConfiguration.load(AbstractFileConfiguration.java:259)
    at com.hortonworks.spark.atlas.shade.org.apache.commons.configuration.AbstractFileConfiguration.load(AbstractFileConfiguration.java:238)
    at com.hortonworks.spark.atlas.shade.org.apache.commons.configuration.AbstractFileConfiguration.<init>(AbstractFileConfiguration.java:197)
    at com.hortonworks.spark.atlas.shade.org.apache.commons.configuration.PropertiesConfiguration.<init>(PropertiesConfiguration.java:284)
    at org.apache.atlas.ApplicationProperties.<init>(ApplicationProperties.java:69)
    at org.apache.atlas.ApplicationProperties.get(ApplicationProperties.java:125)
    ... 32 more
19/08/09 16:40:17 INFO ShutdownHookManager: Shutdown hook called
理查德·内梅特

我相信您已经忘记了安装文档中的另一个步骤。你的错误源于

Caused by: com.hortonworks.spark.atlas.shade.org.apache.commons.configuration.ConfigurationException: Cannot locate configuration source null

并在您发布的 github 存储库中引用他们的 README 文件:

还要确保 atlas 配置文件atlas-application.properties位于驱动程序的类路径中。例如,将此文件放入<SPARK_HOME>/conf.

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

无法在Spark应用程序中设置环境变量

无法使用Spark Submit提交python应用程序

为在Web应用程序中运行的Spark设置spark.driver.memory

无法从Spark应用程序连接到Hive Metastore

如何使用spark-connector-api在where子句中设置blob列?

Spark无法使用mongo-hadoop-connector的BSONFileInputFormat编译newAPIHadoopRDD

spark-atlas-connector:找不到“ SparkCatalogEventProcessor-thread”类异常

使用Java的Spark Big Query Connector的问题

如何在本地使用Spark BigQuery Connector?

使用Maven构建Spark应用程序失败

使用Scala在Spark应用程序中建立反向索引

使用Spark执行Spring Boot应用程序

使用错误的Scala版本创建Spark应用程序

使用Spark typesafe配置提交应用程序属性文件

使用SBT程序集JAR在Spark提交中,Spark无法找到应用程序类本身(ClassNotFoundException)

无法使用代码将应用程序提交给Spark-Cluster

Apache Spark:如何构造Spark应用程序的代码(尤其是在使用广播时)

使用sbt run或spark-submit脚本运行spark应用程序的区别

如何在IntelliJ IDEA中为Spark应用程序设置日志记录级别?

如何在Windows计算机上为Spark应用程序设置集群环境?

SparkContext.setCheckpointDir(hdfsPath)可以在不同的Spark应用程序中设置相同的hdfsPath吗?

从Flask应用程序访问Spark

如何部署Spark Streaming应用程序?

如何打包spark scala应用程序

Spark应用程序记录器

Spark应用程序中的作业总数

从Java Spark应用程序返回List <>

Spark应用程序仅使用1个执行程序

从spark-shell(pyspark)查询Spark Streaming应用程序