I am giving apache beam (with python sdk) a try here so I created a simple pipeline and I tried to deploy it on a Spark cluster.
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
op = PipelineOptions([
"--runner=DirectRunner"
]
)
with beam.Pipeline(options=op) as p:
p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x+1) | beam.Map(print)
This pipeline is working well with DirectRunner. So to deploy the same code on Spark (as the portability is a key concept in Beam)...
First I edited the PipelineOptions
as mentioned here:
op = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK"
]
)
job_endpoint
is the url to the docker container of the beam spark job server that I run using the command:
docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://SPARK_URL:SPARK_PORT
This is supposed to work well but the job fails on Spark with this error :
20/10/31 14:35:58 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 6543101073799644159, local class serialVersionUID = 1574364215946805297
Also, I have this WARN in the beam_spark_job_server
logs:
WARN org.apache.beam.runners.spark.translation.SparkContextFactory: Creating a new Spark Context.
Any idea where is the problem here? Is there any other way to run python Beam Pipelines on spark without passing by a containerized service ?
This could happen due to a version mismatch between the version of the Spark client contained in the job server and the version of Spark to which you are submitting the job.
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments