Apache Beam - skip pipeline step

Chris Halcrow :

I'm using Apache Beam to set up a pipeline consisting of 2 main steps:

  • transform the data using a Beam Transform
  • load the transformed data to BigQuery

The pipeline setup looks like this:

myPCollection = (org.apache.beam.sdk.values.PCollection<myCollectionObjectType>)myInputPCollection
                .apply("do a parallel transform"),
                     ParDo.of(new MyTransformClassName.MyTransformFn()));

 myPCollection
    .apply("Load BigQuery data for PCollection",
            BigQueryIO.<myCollectionObjectType>write()
            .to(new MyDataLoadClass.MyFactTableDestination(myDestination))
            .withFormatFunction(new MyDataLoadClass.MySerializationFn())

I've looked at this question:

Apache Beam: Skipping steps in an already-constructed pipeline

which suggests that I may be able to somehow dynamically change which output I can pass data to, following the parallel transform in step 1.

How do I do this? I don't know how to choose whether or not to pass myPCollection from step 1 to step 2. I need to skip step 2 if the object in myPCollection from step 1 is null.

Anton :

You just don't emit the element from your MyTransformClassName.MyTransformFn when you don't want it in the next step, for example something like this:

class MyTransformClassName.MyTransformFn extends...
  @ProcessElement
  public void processElement(ProcessContext c, ...) {
    ...
    result = ...
    if (result != null) {
       c.output(result);   //only output something that's not null
    }
  }

This way nulls don't reach the next step.

See the ParDo section of the guide for more details: https://beam.apache.org/documentation/programming-guide/#pardo

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

skip header while reading a CSV file in Apache Beam

Create Apache Beam Pipeline that read from Google Pub/Sub

Group elements in Apache Beam pipeline

Collecting output from Apache Beam pipeline and displaying it to console

Apache Beam Pipeline Query Table After Writing Table

Continuous state in Apache Beam pipeline

Bigquery apache beam pipeline "hanging" when using the DirectRunner

Apache Beam / Dataflow - Delays between steps in pipeline

apache beam streaming pipeline to watch gcs file regex

How to set log level for apache beam pipeline running on Direct Runner

Conda Dependencies in Apache Beam Pipeline

How to log incoming messages in apache beam pipeline

Initializing Apache Beam Test Pipeline in Scala fails

Scio Apache Beam - How to properly separate a pipeline code?

How to skip erroneous elements at io level in apache beam with Dataflow?

Apache Beam pipeline step not running in parallel? (Python)

Apache beam pipeline freezing on Windows

GitLab: How to skip or allow pipeline step after rebasing online

apache beam 2.2 pipeline.apply no such method exception

Want to run Apache Beam Pipeline in parallel

If statement for steps in a apache beam dataflow pipeline (python)

Use pipeline data to query BigQuery apache_beam

Skipping step in an apache beam pipeline Python

simple apache beam pipeline throwing TypeError: an integer is required

Running a python Apache Beam Pipeline on Spark

Group by key collection type in Apache Beam pipeline

Apache Beam Side Inputs not working in Streaming Dataflow Pipeline with Python SDK

Access Apache Beam metrics values during pipeline run in python?

Apache Beam - Pass a variable from previous step in Python

TOP Ranking

  1. 1

    Failed to listen on localhost:8000 (reason: Cannot assign requested address)

  2. 2

    Loopback Error: connect ECONNREFUSED 127.0.0.1:3306 (MAMP)

  3. 3

    How to import an asset in swift using Bundle.main.path() in a react-native native module

  4. 4

    pump.io port in URL

  5. 5

    Compiler error CS0246 (type or namespace not found) on using Ninject in ASP.NET vNext

  6. 6

    BigQuery - concatenate ignoring NULL

  7. 7

    ngClass error (Can't bind ngClass since it isn't a known property of div) in Angular 11.0.3

  8. 8

    ggplotly no applicable method for 'plotly_build' applied to an object of class "NULL" if statements

  9. 9

    Spring Boot JPA PostgreSQL Web App - Internal Authentication Error

  10. 10

    How to remove the extra space from right in a webview?

  11. 11

    java.lang.NullPointerException: Cannot read the array length because "<local3>" is null

  12. 12

    Jquery different data trapped from direct mousedown event and simulation via $(this).trigger('mousedown');

  13. 13

    flutter: dropdown item programmatically unselect problem

  14. 14

    How to use merge windows unallocated space into Ubuntu using GParted?

  15. 15

    Change dd-mm-yyyy date format of dataframe date column to yyyy-mm-dd

  16. 16

    Nuget add packages gives access denied errors

  17. 17

    Svchost high CPU from Microsoft.BingWeather app errors

  18. 18

    Can't pre-populate phone number and message body in SMS link on iPhones when SMS app is not running in the background

  19. 19

    12.04.3--- Dconf Editor won't show com>canonical>unity option

  20. 20

    Any way to remove trailing whitespace *FOR EDITED* lines in Eclipse [for Java]?

  21. 21

    maven-jaxb2-plugin cannot generate classes due to two declarations cause a collision in ObjectFactory class

HotTag

Archive