我有一个名为 SGA.jar 的 Scala jar 文件。其中,有一个名为 org/SGA/MainTest 的类,它使用底层的 SGA.jar 逻辑来执行一些图形操作,如下所示:
package org.SGA
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import java.io._
import scala.util._
object MainTest {
def initialize() : Unit = {
println("Initializing")
}
def perform(collection : Iterable[String]) : Unit = {
val conf = new SparkConf().setAppName("maintest")
val sparkContext = new SparkContext(conf)
sparkContext.setLogLevel("ERROR")
val edges = sparkContext.parallelize(collection.toList).map(_.split(" ")).map { edgeCoordinates => new Edge(edgeCoordinates(0).toLong, edgeCoordinates(1).toLong, edgeCoordinates(2).toDouble) }
println("Creating graph")
val graph : Graph[Any, Double] = Graph.fromEdges(edges, 0)
println("Graph created")
// ...
}
}
SGA.jar 嵌入到 scalaWrapper.jar 中,它是围绕 scala SGA.jar 和必要数据集的 Java 包装器。它的文件夹结构如下所示:
scalaWrapper.jar
| META-INF
| | MANIFEST.MF
| scalawrapper
| | datasets
| | | data1.txt
| | jars
| | | SGA.jar
| | FileParser.java
| | FileParser.class
| | WrapperClass.java
| | WrapperClass.class
| .classpath
| .project
FileParser 类基本上将文本文件中可用的数据转换为可用的结构,此处不再赘述。主要类是 WrapperClass,但是:
package scalawrapper;
import scala.collection.*;
import scala.collection.Iterable;
import java.util.List;
import org.SGA.*;
public class WrapperClass {
public static void main(String[] args) {
FileParser fileparser = new FileParser();
String filepath = "/scalawrapper/datasets/data1.txt";
MainTest.initialize();
List<String> list = fileparser.Parse(filepath);
Iterable<String> scalaIterable = JavaConversions.collectionAsScalaIterable(list);
MainTest.perform(scalaIterable);
}
}
SGA.jar 是通过 SBT 构建的,java jar 是从 Eclipse 开发和导出的。在本地运行时(在这种情况下,SparkConf 附加了 .setMaster("local[*]").set("spark.executor.memory","7g") 以促进本地执行),没有问题,代码行为符合预期。
当 scalaWrapper.jar 预计在 EMR 集群上运行时,就会出现问题。集群定义为 1 个主节点 + 4 个工作节点,还有一个额外的 spark 应用步骤:
Main class : None
Arguments : spark-submit --deploy-mode cluster --class scalawrapper.WrapperClass --executor-memory 17g --executor-cores 16 --driver-memory 17g s3://scalaWrapperCluster/scalaWrapper.jar
执行失败:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/mnt1/yarn/usercache/hadoop/filecache/10/__spark_libs__1619195545177535823.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
19/04/22 16:56:43 INFO SignalUtils: Registered signal handler for TERM
19/04/22 16:56:43 INFO SignalUtils: Registered signal handler for HUP
19/04/22 16:56:43 INFO SignalUtils: Registered signal handler for INT
19/04/22 16:56:43 INFO SecurityManager: Changing view acls to: yarn,hadoop
19/04/22 16:56:43 INFO SecurityManager: Changing modify acls to: yarn,hadoop
19/04/22 16:56:43 INFO SecurityManager: Changing view acls groups to:
19/04/22 16:56:43 INFO SecurityManager: Changing modify acls groups to:
19/04/22 16:56:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set()
19/04/22 16:56:44 INFO ApplicationMaster: Preparing Local resources
19/04/22 16:56:44 INFO ApplicationMaster: ApplicationAttemptId: appattempt_1555952041027_0001_000001
19/04/22 16:56:44 INFO ApplicationMaster: Starting the user application in a separate Thread
19/04/22 16:56:44 INFO ApplicationMaster: Waiting for spark context initialization...
19/04/22 16:56:44 ERROR ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: org/SGA/MainTest
java.lang.NoClassDefFoundError: org/SGA/MainTest
at scalawrapper.WrapperClass.main(WrapperClass.java:20)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.ClassNotFoundException: org.SGA.MainTest
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 6 more
注意 WrapperClass.java:20 对应于 MainTest.initialize();。
这个例外似乎很受欢迎,因为我遇到了很多尝试来解决(示例),但没有人解决我的问题。我尝试在 scalaWrapper.jar 文件中还包含用于构建 SGA.jar、消除静态字段、搜索项目定义中的错误的 scala-library,但没有运气。
我通过将 SGA.jar 单独上传到 S3 并将其作为 --jars 参数添加到 spark-submit 解决了该问题。
spark-submit --deploy-mode cluster --jars s3://scalaWrapperCluster/SGA.jar --class scalawrapper.WrapperClass --executor-memory 17g --executor-cores 16 --driver-memory 17g s3://scalaWrapperCluster/scalaWrapper.jar
请注意 scalaWrapper.jar 中的原始功能(包括已经内置的 SGA.jar)没有改变。单独上传的 SGA.jar 是正在执行的。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句