如何在IntelliJ IDEA中使用Kafka Direct Stream运行Spark Streaming应用程序?

德吉什亚达夫

我正在使用Kafka运行用于Spark的程序并出现错误。所有导入均已完成,看起来已解决,没有任何问题。

我已经使用IntelliJ IDEA编写了很少的代码,并且在第一次运行该程序时遇到错误,我是Java的新手,但是来自C#因此无法理解问题。动物园管理员服务已启动,随着卡夫卡服务器启动,并创建了一个名为话题topicA生产者也准备好流式传输数据,但是我在IntelliJ中运行代码以侦听队列时遇到问题

def main(args: Array[String]) {
  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "0",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )
  val conf = new SparkConf().setAppName("Simple Streaming Application")
  val ssc = new StreamingContext(conf, Seconds(5))
  val topics = Array("topicA")
  val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

  stream.foreachRDD { rdd =>
    // Get the offset ranges in the RDD
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    for (o <- offsetRanges) {
      println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}")
    }
  }

  ssc.start

  // the above code is printing out topic details every 5 seconds
  // until you stop it.

  ssc.stop(stopSparkContext = false)
}

产生的异常是:

Exception in thread "main" java.lang.VerifyError: class scala.collection.mutable.WrappedArray overrides final method toBuffer.()Lscala/collection/mutable/Buffer;
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:75)
at org.apache.spark.SparkConf.<init>(SparkConf.scala:70)
at org.apache.spark.SparkConf.<init>(SparkConf.scala:57)
at sparkStreamClass$.main(sparkStreamClass.scala:20)
at sparkStreamClass.main(sparkStreamClass.scala)

这是我的pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.learnStreaming</groupId>
    <artifactId>sparkProjectArtifact</artifactId>
    <version>1.0-SNAPSHOT</version>

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.3.1</version>
        <scope>provided</scope>
    </dependency>

</dependencies>
</project>
德吉什亚达夫

修改了pom.xml,对我有用!

 <properties>
    <spark.version>2.1.0</spark.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
</dependencies>

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何在Kafka Direct Stream中使用Spark结构化流?

如何在Spark Streaming应用程序中从Kafka接收Java对象

如何部署Spark Streaming应用程序?

如何在Intellij中运行Spark Scala应用程序

如何在 Spring Cloud Stream Kafka Streams 应用程序中使用 StateStoreBuilder 添加 StateStore

如何在 Spark 流应用程序中使用 Kafka 主题?

如何在IntelliJ IDEA中为Spark应用程序设置日志记录级别?

如何在IntelliJ IDEA IDE中运行JavaFX应用程序

如何从Intellij idea 2017.1运行angular应用程序?

如何在Intellij IDEA中使用Gradle创建Kotlin控制台应用程序

如何在Direct2D窗口化应用程序中使用DXGI翻转模型?

如何使用IntelliJ调试在Docker中运行的应用程序?

IntelliJ IDEA:如何使用--args运行gradle应用程序插件运行任务

如何在Kubernetes上部署Kafka Stream应用程序?

如何在intellij中运行spring boot应用程序?

如何在IntelliJ IDEA中的应用程序运行配置文件中向目录路径添加目录?

如何在IntelliJ IDEA中使用Gradle运行主要方法?

如何使用Spark的Kafka Direct Stream设置消费者组提交的偏移量?

如何在YARN群集上使用ZeroMQ运行简单的Spark应用程序?

如何在IntelliJ IDEA Community Edition中使用Java EE 7和Glassfish 4创建Java Servlet应用程序?

如果使用应用程序工厂模式,如何在gunicorn中运行flask应用程序?

在 Android Emulator 上运行应用程序时,如何在 Ionic React 应用程序中使用社区 HTTP 插件?

如何在不使用智能IDEA的情况下运行Act框架应用程序

如何在Intellij IDEA上调试基于Scala的Spark程序

IntelliJ IDEA错误正在运行的应用程序

在 intellij idea 中运行 spring mvc 应用程序的问题

如何在AWS上运行Spark Java应用程序?

如何从Java IDE为专业开发人员(IntelliJ IDEA)运行SpringBoot应用程序

如何在Mac OS中使用命令行参数运行应用程序