Create Apache Beam Pipeline that read from Google Pub/Sub

theShadow89

I am trying to create a stream pipeline using apache-beam, that read sentences from google pub/sub and write the words into a Bigquery Table.

I am using 0.6.0 apache-beam version.

Following the examples, I have made this:

public class StreamingWordExtract {

/**
 * A DoFn that tokenizes lines of text into individual words.
 */
static class ExtractWords extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        String[] words = ((String) c.element()).split("[^a-zA-Z']+");
        for (String word : words) {
            if (!word.isEmpty()) {
                c.output(word);
            }
        }
    }
}

/**
 * A DoFn that uppercases a word.
 */
static class Uppercase extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        c.output(c.element().toUpperCase());
    }
}


/**
 * A DoFn that uppercases a word.
 */
static class StringToRowConverter extends DoFn<String, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        c.output(new TableRow().set("string_field", c.element()));
    }

    static TableSchema getSchema() {
        return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
            // Compose the list of TableFieldSchema from tableSchema.
            {
                add(new TableFieldSchema().setName("string_field").setType("STRING"));
            }
        });
    }

}

private interface StreamingWordExtractOptions extends ExampleBigQueryTableOptions, ExamplePubsubTopicOptions {
    @Description("Input file to inject to Pub/Sub topic")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();

    void setInputFile(String value);
}

public static void main(String[] args) {
    StreamingWordExtractOptions options = PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(StreamingWordExtractOptions.class);

    options.setBigQuerySchema(StringToRowConverter.getSchema());

    Pipeline p = Pipeline.create(options);

    String tableSpec = new StringBuilder()
            .append(options.getProject()).append(":")
            .append(options.getBigQueryDataset()).append(".")
            .append(options.getBigQueryTable())
            .toString();

    p.apply(PubsubIO.read().topic(options.getPubsubTopic()))
            .apply(ParDo.of(new ExtractWords()))
            .apply(ParDo.of(new StringToRowConverter()))
            .apply(BigQueryIO.Write.to(tableSpec)
                    .withSchema(StringToRowConverter.getSchema())
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    PipelineResult result = p.run();


}

I have an error near:

apply(ParDo.of(new ExtractWords()))

because the previous apply not return a String but an Object

I suppose that the problem is the type returned from PubsubIO.read().topic(options.getPubsubTopic()). The type is PTransform<PBegin, PCollection<T>> instead of PTransform<PBegin, PCollection<String>>

Which is the correct way to read from google pub/sub using apache-beam?

Davor Bonaci

You are hitting a recent backwards-incompatible change in Beam -- sorry about that!

Starting with Apache Beam version 0.5.0, PubsubIO.Read and PubsubIO.Write need to be instantiated using PubsubIO.<T>read() and PubsubIO.<T>write() instead of the static factory methods such as PubsubIO.Read.topic(String).

Specifying a coder via .withCoder(Coder) for the output type is required for Read. Specifying a coder for the input type, or specifying a format function via .withAttributes(SimpleFunction<T, PubsubMessage>) is required for Write.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Apache Beam/Google Dataflow PubSub to BigQuery Pipeline: Handling Insert Errors and Unexpected Retry Behavior

Apache Beam Pipeline to read from REST API runs locally but not on Dataflow

Alternative for apache beam PubSub read withIdAttribute in normal pubsub client library

How to extract Google PubSub publish time in Apache Beam

Apache Beam pipeline running on Dataflow failed to read from KafkaIO: SSL handshake failed

Apache Beam PubSub Reader Exceptions

Read a file from GCS in Apache Beam

Read CSV and write to BigQuery from Apache Beam

Collecting output from Apache Beam pipeline and displaying it to console

Example with Apache Beam Java SDK and PubSub Source

Conda Dependencies in Apache Beam Pipeline

Apache Beam - skip pipeline step

Group elements in Apache Beam pipeline

Continuous state in Apache Beam pipeline

Apache beam pipeline freezing on Windows

Google DataFlow Apache Beam

How to read data from RabbitMQ using Apache Beam

Apache beam with redis - select database and read from hash?

How to read multiple files in Apache Beam from GCP bucket

Apache Beam - BigQueryIO read Projection

Read whole file in Apache Beam

How to read CSVRecord in apache beam?

HttpForbiddenError when trying to access Google Cloud Storage from Apache Beam

Read Shapefile from Google Cloud Storage using Dataflow + Beam + Python

What is a convenient way to deploy and manage execution of a Python SDK Apache Beam pipeline for Google cloud Dataflow

Replaying data into Apache Beam pipeline over Google Cloud Pub/Sub without overloading other subscribers

How to handle large in-memory data in Apache Beam Pipeline to run on Google Dataflow Runner

If statement for steps in a apache beam dataflow pipeline (python)

Running a python Apache Beam Pipeline on Spark