我们已经编写了具有4个线程的本地模式下的Spark单元测试。
例如通过intellij或sbt testOnly一次启动时,每个测试运行正常。
使用sbt测试启动时,它们会失败,并显示以下错误:
[info] java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.execution.datasources.csv.CSVFileFormat not a subtype
我们特别将sbt和spark版本升级到了最新版本,并尝试fork in test := true
在build.sbt中运行,但这没有帮助。
Spark在2.4.3版本,sbt 1.2.8和2.12.8版本的scala中。
sbt config没什么特别的:
libraryDependencies ++= Seq(
Dependencies.Test.concordion,
Dependencies.`spark-sql` exclude("org.slf4j","slf4j-log4j12"),
Dependencies.`better-files`
)
fork in test := true
dependencyOverrides += "com.google.guava" % "guava" % "11.0.2"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.1"
我们将sbt项目与多个子项目一起使用,定义方式如下:
scalacOptions in ThisBuild ++= Seq(
"-encoding", "UTF-8", // source files are in UTF-8
"-deprecation", // warn about use of deprecated APIs
"-Yrangepos", // use range positions for syntax trees
"-language:postfixOps", // enables postfix operators
"-language:implicitConversions", // enables defining implicit methods and members
"-language:existentials", // enables writing existential types
"-language:reflectiveCalls", // enables reflection
"-language:higherKinds", // allow higher kinded types without `import scala.language.higherKinds`
"-unchecked", // warn about unchecked type parameters
"-feature", // warn about misused language features
/*"-Xlint", // enable handy linter warnings
"-Xfatal-warnings", // turn compiler warnings into errors*/
"-Ypartial-unification" // allow the compiler to unify type constructors of different arities
)
autoCompilerPlugins := true
addCompilerPlugin(Dependencies.`kind-projector`)
addCompilerPlugin(Dependencies.`better-monadic-for`)
// Define the root project, and make it compile all child projects
lazy val `datarepo` =
project
.in(file("."))
.aggregate(
`foo`,
`foo-other`,
`sparkusingproject`,
`sparkusingproject-test`,
`sparkusingproject-other`,
)
// Define individual projects, the directories they reside in, and other projects they depend on
lazy val `foo` =
project
.in(file("foo"))
.settings(Common.defaultSettings: _*)
lazy val `foo-other` =
project
.in(file("foo-other"))
.dependsOn(`foo`)
.settings(Common.defaultSettings: _*)
我只是在测试中遇到了此异常,这是由于尝试在与我启动的线程不同的线程中运行Spark动作引起的SparkSession
。您可能要禁用parallelExecution in Test
(无论如何,建议在Spark集成测试中使用此功能)。
具体来说,我试图并行执行多个Spark动作,并尝试在Scala的ExecutionContext.global
线程池中执行该操作。当我创建一个newFixedPoolExecutor
代替时,一切开始正常工作。
AFAICT这是因为,在中DataSource.scala:610
,Spark获取线程的ContextClassLoader:
val loader = Utils.getContextOrSparkClassLoader
并且,当在Scala的默认线程池中运行时,类加载器不包含相关的类和接口。相反,当您创建一个新的线程池时,它将从当前线程继承正确的类加载器,然后正常工作。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句