Apache Beam pipeline running on Dataflow failed to read from KafkaIO: SSL handshake failed

Jianxin Gao

I'm building an Apache Beam pipeline to read from Kafka as an unbounded source.

I was able to run it locally using direct runner.

However, the pipeline would fail with the attached exception stack trace, when run using Google Cloud Dataflow runner on the cloud.

It seems it's ultimately the Conscrypt Java library that's throwing javax.net.ssl.SSLException: Unable to parse TLS packet header. I'm not really sure how to address this issue.

java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@33b5ff70
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:783)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:126)
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
        java.util.concurrent.FutureTask.report(FutureTask.java:122)
        java.util.concurrent.FutureTask.get(FutureTask.java:206)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:112)
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLException: Unable to parse TLS packet header
        org.conscrypt.ConscryptEngine.unwrap(ConscryptEngine.java:782)
        org.conscrypt.ConscryptEngine.unwrap(ConscryptEngine.java:723)
        org.conscrypt.ConscryptEngine.unwrap(ConscryptEngine.java:688)
        org.conscrypt.Java8EngineWrapper.unwrap(Java8EngineWrapper.java:236)
        org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:464)
        org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:328)
        org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:255)
        org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:79)
        org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:460)
        org.apache.kafka.common.network.Selector.poll(Selector.java:398)
        org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190)
        org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:219)
        org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
        org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:468)
        org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:450)
        org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1772)
        org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1411)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.setupInitialOffset(KafkaUnboundedReader.java:641)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.lambda$start$0(KafkaUnboundedReader.java:106)
        java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Raghu Angadi

Looks like Conscrypt causes SSL errors in many cotexts like this. Dataflow worker in Beam 2.9.0 has an option to disable this. Please try. --experiment=disable_conscrypt_security_provider. Alternately, you can try Beam 2.4.x, which does not enable Conscrypt.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Apache Beam Pipeline to read from REST API runs locally but not on Dataflow

Apache Beam on Cloud Dataflow - Failed to query Cadvisor

GCP Dataflow runner error when deploying pipeline using beam-nuggets library - "Failed to read inputs in the data_plane."

RMI with SSL: Failed handshake

SAP SSL handshake failed

Apache SSL error: Re-negotiation handshake failed: Not accepted by client?

Diagnosing failed Cloud Dataflow pipeline

If statement for steps in a apache beam dataflow pipeline (python)

Apache Beam / Dataflow - Delays between steps in pipeline

Create Apache Beam Pipeline that read from Google Pub/Sub

Cannot add APT repository: SSL handshake failed

OpenLiteSpeed Poor Performance and SSL handshake failed(5)

Cloudflare SSL handshake failed with socket.io

Kafka SSL handshake failed in custom Java producer

Error installing Crashlytics - SSL peer handshake failed

Nginx with Cloudflare: Error 525 SSL Handshake failed

javafx 16 WebEngine Exception "SSL Handshake failed"

SSL handshake failed when calling WebService

MongoDB Atlas ServerSelectionTimeoutError: SSL handshake failed

DefaultHttpClient Android SSL No peer certificate and Handshake failed

Running a python Apache Beam Pipeline on Spark

Apache Beam pipeline step not running in parallel? (Python)

curl error 35 : failed to receive handshake, SSL/TLS connection failed

Beam pipeline not moving in Google Dataflow while running ok on direct runner

Apache Beam Side Inputs not working in Streaming Dataflow Pipeline with Python SDK

Dataflow jobs are failed due to Beam-nuggets referencing to sqlalchemy

SSL crashing Apache (Failed to load)

Apache Beam GroupByKey() fails when running on Google DataFlow in Python

Handshake failed with fatal error SSL_ERROR_SSL