Spark executors fails to run on kubernetes cluster

Satyam

I am trying to run a simple spark job on a kubernetes cluster. I deployed a pod that starts a pyspark shell and in that shell I am changing the spark configuration as specified below:

>>> sc.stop()
>>> sparkConf = SparkConf()
>>> sparkConf.setMaster("k8s://https://kubernetes.default.svc:443")
>>> sparkConf.setAppName("pyspark_test")
>>> sparkConf.set("spark.submit.deployMode", "client")
>>> sparkConf.set("spark.executor.instances", 2)
>>> sparkConf.set("spark.kubernetes.container.image", "us.icr.io/testspark/spark:v1")
>>> sparkConf.set("spark.kubernetes.namespace", "anonymous")
>>> sparkConf.set("spark.driver.memory", "1g")
>>> sparkConf.set("spark.executor.memory", "1g")
>>> sparkConf.set("spark.driver.host", "testspark")
>>> sparkConf.set("spark.driver.port", "37771")
>>> sparkConf.set("spark.kubernetes.driver.pod.name", "testspark")
>>> sparkConf.set("spark.driver.bindAddress", "0.0.0.0")
>>>
>>> spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
>>> sc = spark.sparkContext

This starts two new executor pods but both fails:

satyam@Satyams-MBP ~ % kubectl get pods -n anonymous
NAME                                                              READY   STATUS    RESTARTS   AGE
pysparktest-c1c8f177591feb60-exec-1                               0/2     Error     0          111m
pysparktest-c1c8f177591feb60-exec-2                               0/2     Error     0          111m
testspark                                                         2/2     Running   0          116m

I checked logs for one of the executor pod and it shows following error:

satyam@Satyams-MBP ~ % kubectl logs -n anonymous pysparktest-c1c8f177591feb60-exec-1 -c spark-kubernetes-executor 
++ id -u
+ myuid=185
++ id -g
+ mygid=0
+ set +e
++ getent passwd 185
+ uidentry=
+ set -e
+ '[' -z '' ']'
+ '[' -w /etc/passwd ']'
+ echo '185:x:185:0:anonymous uid:/opt/spark:/bin/false'
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ sort -t_ -k4 -n
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' '' == 2 ']'
+ '[' '' == 3 ']'
+ '[' -n '' ']'
+ '[' -z ']'
+ case "$1" in
+ shift 1
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP)
+ exec /usr/bin/tini -s -- /usr/local/openjdk-8/bin/java -Dio.netty.tryReflectionSetAccessible=true -Dspark.driver.port=37771 -Xms1g -Xmx1g -cp ':/opt/spark/jars/*:' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@testspark:37771 --executor-id 1 --cores 1 --app-id spark-application-1612108001966 --hostname 172.30.174.196
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/01/31 15:46:49 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 14@pysparktest-c1c8f177591feb60-exec-1
21/01/31 15:46:49 INFO SignalUtils: Registered signal handler for TERM
21/01/31 15:46:49 INFO SignalUtils: Registered signal handler for HUP
21/01/31 15:46:49 INFO SignalUtils: Registered signal handler for INT
21/01/31 15:46:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/01/31 15:46:49 INFO SecurityManager: Changing view acls to: 185,root
21/01/31 15:46:49 INFO SecurityManager: Changing modify acls to: 185,root
21/01/31 15:46:49 INFO SecurityManager: Changing view acls groups to: 
21/01/31 15:46:49 INFO SecurityManager: Changing modify acls groups to: 
21/01/31 15:46:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(185, root); groups with view permissions: Set(); users  with modify permissions: Set(185, root); groups with modify permissions: Set()
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:283)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:272)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:302)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$3(CoarseGrainedExecutorBackend.scala:303)
    at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
    at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
    at scala.collection.immutable.Range.foreach(Range.scala:158)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$1(CoarseGrainedExecutorBackend.scala:301)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    ... 4 more
Caused by: java.io.IOException: Failed to connect to testspark/172.30.174.253:37771
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)
    at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: testspark/172.30.174.253:37771
Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

I have also created a headless service according to the instruction here https://spark.apache.org/docs/latest/running-on-kubernetes.html#client-mode-networking. Below is the yaml for the service as well as the driver pod:

Service

apiVersion: v1
kind: Service
metadata:
    name: testspark
spec:
    clusterIP: "None"
    selector:
        spark-app-selector: testspark
    ports:
        - name: driver-rpc-port
          protocol: TCP 
          port: 37771
          targetPort: 37771
        - name: blockmanager
          protocol: TCP 
          port: 37772
          targetPort: 37772

Driver Pod

apiVersion: v1
kind: Pod
metadata:
  name: testspark
  labels:
    spark-app-selector: testspark
spec:
  containers:
    - name: testspark
      securityContext:
        runAsUser: 0
      image: jupyter/pyspark-notebook
      ports:
        - containerPort: 37771
      command: ["tail", "-f", "/dev/null"]
  serviceAccountName: default-editor

This should have allowed executor pods to connect to the driver (which, I checked have the correct ip 172.30.174.249). To debug the network, I started a shell in the driver container and netstat the listening ports. Here is the output for the same:

root@testspark:/opt/spark/work-dir# netstat -tulpn
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
tcp        0      0 127.0.0.1:15000         0.0.0.0:*               LISTEN      -                   
tcp        0      0 0.0.0.0:15001           0.0.0.0:*               LISTEN      -                   
tcp        0      0 0.0.0.0:15090           0.0.0.0:*               LISTEN      -                   
tcp6       0      0 :::4040                 :::*                    LISTEN      35/java             
tcp6       0      0 :::37771                :::*                    LISTEN      35/java             
tcp6       0      0 :::15020                :::*                    LISTEN      -                   
tcp6       0      0 :::41613                :::*                    LISTEN      35/java 

I also tried to connect to the driver pod on the port 37771 via telnet from another running pod on the same namespace and it was able to connect.

root@test:/# telnet 172.30.174.249 37771
Trying 172.30.174.249...
Connected to 172.30.174.249.
Escape character is '^]'.

I am not sure why my executor pods are not able to connect to the driver on the same port. Am I missing any configuration or am I doing anything wrong? I can supply more information if required.

UPDATE

I created a fake spark executor image with the following docker file:

FROM us.icr.io/testspark/spark:v1

ENTRYPOINT ["tail", "-f", "/dev/null"]

and passed this image as spark.kubernetes.container.image config while instantiating the spark context. I got two running executor pods. I exec into one of them with command kubectl exec -n anonymous -it pysparktest-c1c8f177591feb60-exec-1 -c spark-kubernetes-executor bash and ran the following command /opt/entrypoint.sh executor and to my surprise, executor could connect with the driver just fine. Here is the stack trace for the same:

++ id -u
+ myuid=185
++ id -g
+ mygid=0
+ set +e
++ getent passwd 185
+ uidentry='185:x:185:0:anonymous uid:/opt/spark:/bin/false'
+ set -e
+ '[' -z '185:x:185:0:anonymous uid:/opt/spark:/bin/false' ']'
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sort -t_ -k4 -n
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' '' == 2 ']'
+ '[' '' == 3 ']'
+ '[' -n '' ']'
+ '[' -z ']'
+ case "$1" in
+ shift 1
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP)
+ exec /usr/bin/tini -s -- /usr/local/openjdk-8/bin/java -Dio.netty.tryReflectionSetAccessible=true -Dspark.driver.port=37771 -Xms1g -Xmx1g -cp ':/opt/spark/jars/*:' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://[email protected]:37771 --executor-id 1 --cores 1 --app-id spark-application-1612191192882 --hostname 172.30.174.249
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/02/01 15:00:16 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 39@pysparktest-27b678775e1556d9-exec-1
21/02/01 15:00:16 INFO SignalUtils: Registered signal handler for TERM
21/02/01 15:00:16 INFO SignalUtils: Registered signal handler for HUP
21/02/01 15:00:16 INFO SignalUtils: Registered signal handler for INT
21/02/01 15:00:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/02/01 15:00:17 INFO SecurityManager: Changing view acls to: 185,root
21/02/01 15:00:17 INFO SecurityManager: Changing modify acls to: 185,root
21/02/01 15:00:17 INFO SecurityManager: Changing view acls groups to: 
21/02/01 15:00:17 INFO SecurityManager: Changing modify acls groups to: 
21/02/01 15:00:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(185, root); groups with view permissions: Set(); users  with modify permissions: Set(185, root); groups with modify permissions: Set()
21/02/01 15:00:17 INFO TransportClientFactory: Successfully created connection to testspark.anonymous.svc.cluster.local/172.30.174.253:37771 after 173 ms (0 ms spent in bootstraps)
21/02/01 15:00:18 INFO SecurityManager: Changing view acls to: 185,root
21/02/01 15:00:18 INFO SecurityManager: Changing modify acls to: 185,root
21/02/01 15:00:18 INFO SecurityManager: Changing view acls groups to: 
21/02/01 15:00:18 INFO SecurityManager: Changing modify acls groups to: 
21/02/01 15:00:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(185, root); groups with view permissions: Set(); users  with modify permissions: Set(185, root); groups with modify permissions: Set()
21/02/01 15:00:18 INFO TransportClientFactory: Successfully created connection to testspark.anonymous.svc.cluster.local/172.30.174.253:37771 after 3 ms (0 ms spent in bootstraps)
21/02/01 15:00:18 INFO DiskBlockManager: Created local directory at /var/data/spark-839bad93-b01c-4bc9-a33f-51c7493775e3/blockmgr-ad6a42b9-cfe2-4cdd-aa28-37a0ab77fb16
21/02/01 15:00:18 INFO MemoryStore: MemoryStore started with capacity 413.9 MiB
21/02/01 15:00:19 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://[email protected]:37771
21/02/01 15:00:19 INFO ResourceUtils: ==============================================================
21/02/01 15:00:19 INFO ResourceUtils: Resources for spark.executor:

21/02/01 15:00:19 INFO ResourceUtils: ==============================================================
21/02/01 15:00:19 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
21/02/01 15:00:19 INFO Executor: Starting executor ID 1 on host 172.30.174.249
21/02/01 15:00:19 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40515.
21/02/01 15:00:19 INFO NettyBlockTransferService: Server created on 172.30.174.249:40515
21/02/01 15:00:19 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/02/01 15:00:19 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(1, 172.30.174.249, 40515, None)
21/02/01 15:00:19 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(1, 172.30.174.249, 40515, None)
21/02/01 15:00:19 INFO BlockManager: Initialized BlockManager: BlockManagerId(1, 172.30.174.249, 40515, None)

I am actually puzzeled why this might be happening. Is there any work around I can try to get this thing working automatically instead of me having to run it manually?

Satyam

I was finally able to solve this problem with the help of my colleague. I just added these two configs to disable the istio sidecar injection and it started working.

sparkConf.set("spark.kubernetes.driver.annotation.sidecar.istio.io/inject", "false")
sparkConf.set("spark.kubernetes.executor.annotation.sidecar.istio.io/inject", "false")

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

What are workers, executors, cores in Spark Standalone cluster?

Kubernetes cluster setup fails

Cassandra Connector fails when run under Spark 2.3 on Kubernetes

How can a variable declared in scala can be used by spark executors in cluster

How multiple executors are managed on the worker nodes with a Spark standalone cluster?

SPARK standalone cluster: Executors exit, how to track the source of the error?

Spark cluster full of heartbeat timeouts, executors exiting on their own

EMR Spark job using less executors than nodes in the cluster

How does spark choose nodes to run executors?(spark on yarn)

Deploying spark jar on kubernetes cluster

Spark on Dataproc: possible to run more executors per CPU?

Run Spark code written in Scala in spark cluster

Spark submit (2.3) on kubernetes cluster from Python

Spark with yarn-client on HDP multi nodes cluster only starts executors on the same single node

Why does spark-submit in YARN cluster mode not find python packages on executors?

Run Nexus 3 with Docker in a Kubernetes cluster

How to run a https secured app in kubernetes cluster

Pgbouncer: how to run within a kubernetes cluster properly

Can't run Kubernetes successfully on rancher cluster

Run Python mrjob in a Kubernetes on Hadoop Cluster

Kubernetes cluster does not run after reboot

Spark Repartition Executors

Object cache on Spark executors

Spark failed to connect with executors

Yarn as resource manager in SPARK for linux cluster - inside Kubernetes and outside Kubernetes

Spark job on kubernetes fails without specific error

Can I run Kubernetes Dashboard on a separate cluster than the targetted cluster

Run kafka in docker which is not the part of kubernetes cluster but access it from that cluster

How to run Spark application assembled with Spark 2.1 on cluster with Spark 1.6?