Apache beam read csv file and groupbykey

Cecilia

I have a csv file, I know how to achieve this by using pandas, basically read csv as a df -> group the data by field `'aaa', 'bbb'and then construct a new 'id'.

My question is how can I achieve the same with Apache Beam, I've never used it before, I'm trying to use Beam to read this csv file and group multiple records, but the exact same functionality I used with pandas doesn't support Beam, the following is my current code:

import apache_beam as beam
from apache_beam.dataframe.io import read_csv

pipeline = beam.Pipeline()
csv_lines = (pipeline | 'ReadFile' >> beam.io.ReadFromText('xxx.csv')
| ???? )

My question is:

  • How can I perform data manipulation when there's no header after using beam.io.ReadFromText()
  • What's the best way to achieve the same thing with Beam as I described above (group multiple records and construct a new id, then convert it to json)

Hope this makes sense, I'm new to Beam, any help would be appreciated. Thanks.

robertwb

If you're able to do it with Pandas, you should be able to do it with the Beam Dataframes API in exactly the same way.

To do this with Beam directly, you can map your rows into 2-tuples, where the first element is the key ('postcode', 'property_type','old/new','PAON' in your case) and then use GroupByKey to group things together. For example, suppose your element is a dictionary with the above keys, i.e. the elements of your input PCollection are dictionaries

{'postcode': ..., 'property_type': ..., 'old/new': ..., 'PAON': ..., ...}

Then you could write

def get_key(property_dict):
  return (
      property_dict['postcode'],
      element['property_type'], 
      element['old/new'], 
      element['PAON'])
      

grouped = (
    input_pcoll
    | beam.Map(lambda element: (get_key(element), element))
    | beam.GroupByKey())

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

How to read data from RabbitMQ using Apache Beam

skip header while reading a CSV file in Apache Beam

Apache Beam - BigQueryIO read Projection

How to read Hadoop files with Apache Beam?

Read a file from GCS in Apache Beam

Opening a gzip file in python Apache Beam

Apache Beam get source File Name

How to convert csv into a dictionary in apache beam dataflow

Create Apache Beam Pipeline that read from Google Pub/Sub

Apply Side input to BigQueryIO.read operation in Apache Beam

Apache Beam Options from property file

Apache Beam GroupByKey() fails when running on Google DataFlow in Python

Read CSV and write to BigQuery from Apache Beam

How to read and manipulate a Json file with Apache beam in Python

Apache beam with redis - select database and read from hash?

Python/Apache-Beam: How to Parse Text File To CSV?

Java - Apache Beam: Read file from GCS with "UCS2-LE BOM" encoding

Read a csv file, clean it, then write out the result as a csv using Apache Beam dataflow

Apache Jmeter read SQL variables from csv file

How to read CSV file using Apache file?

Using Apache Commons CSV to read file starting from the second line

Beam GroupByKey in Spark Streaming on Yarn

Getting Duplicate while trying to read CSV file with Apache Common CSV

How to read CSVRecord in apache beam?

Read whole file in Apache Beam

How to read multiple files in Apache Beam from GCP bucket

Alternative for apache beam PubSub read withIdAttribute in normal pubsub client library

Use Apache beam `GroupByKey` and construct a new column - Python

read public http csv data into Apache Beam