skip header while reading a CSV file in Apache Beam

Nagesh Singh Chauhan :

I want to skip header line from a CSV file. As of now I'm removing the header manually before loading it to google storage.

Below is my code :

PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));        
    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {
            String[] strArr = c.element().split(",");
            ClassFinance fin = new ClassFinance();
            fin.setBeneficiaryFinance(strArr[0]);
            fin.setCatlibCode(strArr[1]);
            fin.set_rNR_(Double.valueOf(strArr[2]));
            fin.set_rNCS_(Double.valueOf(strArr[3]));
            fin.set_rCtb_(Double.valueOf(strArr[4]));
            fin.set_rAC_(Double.valueOf(strArr[5]));
            c.output(fin);
        }
    }));

I have checked the existing question in stackoverflow but I dont find it promising : Skipping header rows - is it possible with Cloud DataFlow?

Any help ?

Edit : I have tried something like below and it worked :

PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));       

    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {  
            String[] strArr2 = c.element().split(",");
            String header = Arrays.toString(strArr2);
            ClassFinance fin = new ClassFinance();

                if(header.contains("Beneficiary"))
                System.out.println("Header");
                else {
            fin.setBeneficiaryFinance(strArr2[0].trim());
            fin.setCatlibCode(strArr2[1].trim());
            fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
            fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
            fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
            fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
            c.output(fin);
            }
        }
    }));
dsesto :

The older Stack Overflow post that you shared (Skipping header rows - is it possible with Cloud DataFlow?) does contain the answer to your question.

This option is currently not available in the Apache Beam SDK, although there is an open Feature Request in the Apache Beam JIRA issue tracker, BEAM-123. Note that, as of writing, this feature request is still open and unresolved, and it has been like that for 2 years already. However, it looks like some effort is being done in that sense, and the latest update in the issue is from February 2018, so I would advise you to stay updated on that JIRA issue, as it was last moved to the sdk-java-core component, and it may be getting more attention there.

With that information in mind, I would say that the approach you are using (removing the header before uploading the file to GCS) is the best option for you. I would refrain from doing it manually, as you can easily script that and automate the remove headerupload file process.


EDIT:

I have been able to come up with a simple filter using a DoFn. It might not be the most elegant solution (I am not an Apache Beam expert myself), but it does work, and you may be able to adapt it to your needs. It requires that you know beforehand the header of the CSV files being uploaded (as it will be filtering by element content), but again, take this just as a template that you may be able to modify to your needs:

public class RemoveCSVHeader {
  // The Filter class
  static class FilterCSVHeaderFn extends DoFn<String, String> {
    String headerFilter;

    public FilterCSVHeaderFn(String headerFilter) {
      this.headerFilter = headerFilter;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
      String row = c.element();
      // Filter out elements that match the header
      if (!row.equals(this.headerFilter)) {
        c.output(row);
      }
    }
  }

  // The main class
  public static void main(String[] args) throws IOException {
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline p = Pipeline.create(options);

    PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));

    String header = "col1,col2,col3,col4";

    vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))
        .apply(TextIO.write().to("out"));

    p.run().waitUntilFinish();
  }
}

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Skip header of CSV file while reading in core Java with below approach

Skip blank lines while reading .csv file using opencsv (java)

How to skip lines while reading a CSV file as a dataFrame using PySpark?

skip second row of dataframe while reading csv file in python

Skip Header Row when reading in a CSV

Skip reading header of text file in C

skip double quotes when reading csv file using apache commons csv

How to skip carriage returns in csv file while reading from cloud storage using google cloud dataflow in java

How can I skip even/odd rows while reading a csv file?

Skip `# ` character when reading header with pandas read_csv

Apache beam read csv file and groupbykey

Ruby CSV | How to skip few columns while reading CSV?

Apache Beam - skip pipeline step

Is there any method to skip some characters while reading csv

How to skip over white spaces in a string while reading from a file

skip space all at once while reading from a file

Skip certain number of lines while reading a text file - BufferedReader Java

Skip first couple of lines while reading lines in Python file

Skip first couple of lines while reading lines in Python file

zcat skip streaming first line while reading from a compressed file

How to skip primitive data values while reading from file

How skip reading " : " while taking input from a file in C++

Skip the first column when reading a csv file Python

How to skip blank cells when reading a csv file in Java?

How to skip the first row when reading a csv file?

Apache Spark Reading CSV file - ClassNotFoundException

Reading and Parsing CSV File in Apache Cordova App

wrong schema while reading csv file as a dataframe

In Scan EOF error while reading CSV file