Flink on yarn: how to add external jars class path in local Client


I tried to deploy my application to flink on yarn with cli, Unfortunately,it's fail with below Exception

java.lang.NoClassDefFoundError: Lredis/clients/jedis/JedisCluster;
    at java.lang.Class.getDeclaredFields0(Native Method)
    at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
    at java.lang.Class.getDeclaredFields(Class.java:1916)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:72)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
    at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:551)
    at org.apache.flink.streaming.api.scala.DataStream.flatMap(DataStream.scala:594)
    at com.hypers.hwt.realtime.top.HwtRealTimeTopRunner.executeLateStream(HwtRealTimeTop.scala:138)
    at com.hypers.hwt.realtime.top.HwtRealTimeTopRunner.run(HwtRealTimeTop.scala:72)
    at com.hypers.hwt.realtime.top.HwtRealTimeTop$.main(HwtRealTimeTop.scala:265)
    at com.hypers.hwt.realtime.top.HwtRealTimeTop.main(HwtRealTimeTop.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
    at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)

I already use -yt parameter to distribute my external jars,but still fail. Actually,flink submit job with 3 step:

  1. wrap code and build graph in client
  2. client submit job to jobmanager
  3. jobmanager distribute job to taskmanager


In long time test,I found this Exception is happen in step1. And step1 is run in local by YarnClusterClient. And I know this problem will be solved by add my external jars in $FLINK_HOME/lib,but it will cause conflict with other application


So I want to know if there are any way to add external jars class path in local?


class LateFlatMap(conf: FlinkJedisClusterConfig) extends RichFlatMapFunction[(PvAccBean, UvAccBean), Iterable[(String, Array[Byte])]] {
  var jedisCluster: JedisCluster = null

  override def open(properties: Configuration): Unit = {
   val genericObjectPoolConfig = new GenericObjectPoolConfig()
   jedisCluster = new JedisCluster(conf.getNodes(), conf.getConnectionTimeout(),
   conf.getMaxRedirections(), genericObjectPoolConfig)

  override def close(): Unit = {

Basically I see two possibilities:

  1. Add your 3rd party libs to your jobs jar by building a fat jar. Each major build system can do so (e.g. Maven Assembly Plugin or SBT Assembly Plugin). This would be my preferred solution
  2. If you want to use your 3rd party libs for all of your flink jobs you can add them to flinks jars directory before your start your cluster. This will also work but offer you less flexibility.

Hope that helps

