Running a python Apache Beam Pipeline on Spark

farhawa

I am giving apache beam (with python sdk) a try here so I created a simple pipeline and I tried to deploy it on a Spark cluster.

from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam

op = PipelineOptions([
        "--runner=DirectRunner"
    ]
)


with beam.Pipeline(options=op) as p:
    p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x+1) | beam.Map(print)

This pipeline is working well with DirectRunner. So to deploy the same code on Spark (as the portability is a key concept in Beam)...

First I edited the PipelineOptions as mentioned here:

op = PipelineOptions([
        "--runner=PortableRunner",
        "--job_endpoint=localhost:8099",
        "--environment_type=LOOPBACK"
    ]
)

job_endpoint is the url to the docker container of the beam spark job server that I run using the command:

docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://SPARK_URL:SPARK_PORT

This is supposed to work well but the job fails on Spark with this error :

20/10/31 14:35:58 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.

java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 6543101073799644159, local class serialVersionUID = 1574364215946805297

Also, I have this WARN in the beam_spark_job_server logs:

WARN org.apache.beam.runners.spark.translation.SparkContextFactory: Creating a new Spark Context.

Any idea where is the problem here? Is there any other way to run python Beam Pipelines on spark without passing by a containerized service ?

Kenn Knowles

This could happen due to a version mismatch between the version of the Spark client contained in the job server and the version of Spark to which you are submitting the job.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Apache Beam pipeline step not running in parallel? (Python)

If statement for steps in a apache beam dataflow pipeline (python)

Skipping step in an apache beam pipeline Python

How to set log level for apache beam pipeline running on Direct Runner

Apache beam StateSpec in spark

Apache Beam Side Inputs not working in Streaming Dataflow Pipeline with Python SDK

Access Apache Beam metrics values during pipeline run in python?

Conda Dependencies in Apache Beam Pipeline

Apache Beam - skip pipeline step

Group elements in Apache Beam pipeline

Continuous state in Apache Beam pipeline

Apache beam pipeline freezing on Windows

Apache Beam GroupByKey() fails when running on Google DataFlow in Python

Apache Beam pipeline running on Dataflow failed to read from KafkaIO: SSL handshake failed

Apache Beam seems to not be running the tests

Spark.ml on Apache Beam

Apache Beam / Dataflow - Delays between steps in pipeline

How to log incoming messages in apache beam pipeline

Initializing Apache Beam Test Pipeline in Scala fails

Want to run Apache Beam Pipeline in parallel

Group by key collection type in Apache Beam pipeline

Using MatchFiles() in apache beam pipeline to get file name and parse json in python

What is a convenient way to deploy and manage execution of a Python SDK Apache Beam pipeline for Google cloud Dataflow

Dependency errors when running Apache Beam with Kafka

How to reshuffle in apache beam using spark runner

Apache Beam TextIO does not work with Spark Runner

Explain Apache Beam python syntax

Apache beam Python: condition and break

Apache Beam Python ReadFromText Regex