Source Vs PTransform

pibafe :

I am new to the project, and I am trying to create a connector between Dataflow and a database.

The documentation clearly states that I should use a Source and a Sink but I see a lot of people using directly a PTransform associated with a PInput or a PDone.

The source/sink API is in experimental (which explaines all the examples with the PTransform), but seems more easy to integrate it with a custom runner (ie: spark for example).

If I refer to the code, the two methods are used. I cannot see any use case where it will be more interesting to use the PTransform API.

Is the Source/Sink API is supposed to remplace the PTranform API?

Did I miss something that clearly differentiate the two methods?

Is the Source/Sink API stable enough to be considered the good way to code inputs and outputs?

Thank for you advice!

jkff :

The philosophy of Dataflow is that PTransform is the main unit of abstraction and composability, i.e., any self-contained data processing task should be encapsulated as a PTransform. This includes the task of connecting to a third-party storage system: ingesting data from somewhere or exporting it to somewhere.

Take, for example, Google Cloud Datastore. In the code snippet:

    PCollection<Entity> entities =
      p.apply(DatastoreIO.readFrom(dataset, query));
    ...
    p.apply(some processing)
     .apply(DatastoreIO.writeTo(dataset));

the return type of DatastoreIO.readFrom(dataset, query) is a subclass of PTransform<PBegin, PCollection<Entity>>, and the type of DatastoreIO.writeTo(dataset) is a subclass of PTransform<PCollection<Entity>, PDone>.

It is true that these functions are under the hood implemented using the Source and Sink classes, but to a user who just wants to read or write something to Datastore, that's an implementation detail that usually should not matter (however, see the note at the end of this answer about exposing the Source or Sink class). Any connector, or for that matter, any other data processing task is a PTransform.

Note: Currently connectors that read from somewhere tend to be PTransform<PBegin, PCollection<T>>, and connectors that write to somewhere tend to be PTransform<PCollection<T>, PDone>, but we are considering options to make it easier to use connectors in more flexible ways (for example, reading from a PCollection of filenames).

However, of course, this detail matters to somebody who wants to implement a new connector. In particular, you may ask:

Q: Why do I need the Source and Sink classes at all, if I could just implement my connector as a PTransform?

A: If you can implement your connector by just using the built-in transforms (such as ParDo, GroupByKey etc.), that's a perfectly valid way to develop a connector. However, the Source and Sink classes provide some low-level capabilities that, in case you need them, would be cumbersome or impossible to develop yourself.

For example, BoundedSource and UnboundedSource provide hooks for controlling how parallelization happens (both initial and dynamic work rebalancing - BoundedSource.splitIntoBundles, BoundedReader.splitAtFraction), while these hooks are not currently exposed for arbitrary DoFns.

You could technically implement a parser for a file format by writing a DoFn<FilePath, SomeRecord> that takes the filename as input, reads the file and emits SomeRecord, but this DoFn would not be able to dynamically parallelize reading parts of the file onto multiple workers in case the file turned out to be very large at runtime. On the other hand, FileBasedSource has this capability built-in, as well as handling of glob filepatterns and such.

Likewise, you could try implementing a connector to a streaming system by implementing a DoFn that takes a dummy element as input, establishes a connection and streams all elements into ProcessingContext.output(), but DoFns currently don't support writing unbounded amounts of output from a single bundle, nor do they explicitly support the checkpointing and deduplication machinery needed for the strong consistency guarantees Dataflow gives to streaming pipelines. UnboundedSource, on the other hand, supports all this.

Sink (more precisely, the Write.to() PTransform) is also interesting: it is just a composite transform that you could write yourself if you wanted to (i.e. it has no hard-coded support in the Dataflow runner or backend), but it was developed with consideration for typical distributed fault tolerance issues that arise when writing data to a storage system in parallel, and it provides hooks that force you to keep those issues in mind: e.g., because bundles of data are written in parallel, and some bundles may be retried or duplicated for fault tolerance, there is a hook for "committing" just the results of the successfully completed bundles (WriteOperation.finalize).

To summarize: using Source or Sink APIs to develop a connector helps you structure your code in a way that will work well in a distributed processing setting, and the source APIs give you access to advanced capabilities of the framework. But if your connector is a very simple one that needs neither, then you are free to just assemble your connector from other built-in transforms.

Q: Suppose I decide to make use of Source and Sink. Then how do I package my connector as a library: should I just provide the Source or Sink class, or should I wrap it into a PTransform?

A: Your connector should ultimately be packaged as a PTransform, so that the user can just p.apply() it in their pipeline. However, under the hood your transform can use Source and Sink classes.

A common pattern is to expose the Source and Sink classes as well, making use of the Fluent Builder pattern, and letting the user wrap them into a Read.from() or Write.to() transform themselves, but this is not a strict requirement.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related