How to drop columns based on multiple filters in a dataframe using PySpark?

Aviral Srivastava

I have a list of valid values that a cell can have. If one cell in a column is invalid, I need to drop the whole column. I understand there are answers of dropping rows in a particular column but here I am dropping the whole column instead even if one cell in it is invalid. The conditions for valid/invalid are that a cell can have only three values: ['Messi', 'Ronaldo', 'Virgil']

I tried reading about filtering but all I could see was filtering columns and dropping the rows. For instance in this question. I also read that one should avoid too much scanning and shuffling in Spark, which I agree with.

I am not only looking at the code solution but more on the off-the-shelf code provided from PySpark. I hope it doesn't get out of the scope of a SO answer.

For the following input dataframe:

| Column 1      | Column 2      | Column 3      | Column 4      | Column 5      |
| --------------| --------------| --------------| --------------| --------------|
|  Ronaldo      | Salah         |  Messi        |               |Salah          |
|  Ronaldo      | Messi         |  Virgil       |  Messi        | null          |
|  Ronaldo      | Ronaldo       |  Messi        |  Ronaldo      | null          |

I expect the following output:

| Column 1      | Column 2      |
| --------------| --------------| 
|  Ronaldo      | Messi         |
|  Ronaldo      | Virgil        |
|  Ronaldo      | Messi         |
pault

I am not only looking at the code solution but more on the off-the-shelf code provided from PySpark.

Unfortunately, Spark is designed to operate in parallel on a row-by-row basis. Filtering out columns is not something for which there will be an "off-the-shelf code" solution.

Nevertheless, here is one approach you can take:

First collect the counts of the invalid elements in each column.

from pyspark.sql.functions import col, lit, sum as _sum, when

valid = ['Messi', 'Ronaldo', 'Virgil']
invalid_counts = df.select(
    *[_sum(when(col(c).isin(valid), lit(0)).otherwise(lit(1))).alias(c) for c in df.columns]
).collect()
print(invalid_counts)
#[Row(Column 1=0, Column 2=1, Column 3=0, Column 4=1, Column 5=3)]

This output will be a list with only one element. You can iterate over the items in this element to find the columns to keep.

valid_columns = [k for k,v in invalid_counts[0].asDict().items() if v == 0]
print(valid_columns)
#['Column 3', 'Column 1']

Now just select these columns from your original DataFrame. You can first sort valid_columns using list.index if you want to maintain the original column order.

valid_columns = sorted(valid_columns, key=df.columns.index)
df.select(valid_columns).show()
#+--------+--------+
#|Column 1|Column 3|
#+--------+--------+
#| Ronaldo|   Messi|
#| Ronaldo|  Virgil|
#| Ronaldo|   Messi|
#+--------+--------+

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

How do I use flatmap with multiple columns in Dataframe using Pyspark

How to drop DataFrame columns based on dtype

Adding multiple columns in pyspark dataframe using a loop

Drop columns if exist in Dataframe in Pyspark

Pyspark dataframe drop columns issue

How do I drop rows from a Pandas dataframe based on data in multiple columns?

How to explode multiple columns of a dataframe in pyspark

How to pivot a DataFrame in PySpark on multiple columns?

Pyspark dataframe how to drop rows with nulls in all columns?

How to drop all columns with null values in a PySpark DataFrame?

pyspark dataframe transpose multiple columns based on column suffix

PySpark function to create custom output based on multiple columns of dataframe

Pyspark Dataframe - How to concatenate columns based on array of columns as input

How to drop multiple columns in dataframe, by using with loc and also with column names simultaneously

Drop multiple columns based on a condition

Drop duplicates based on multiple columns

Generate n columns in dataframe based on mutiple values using Pyspark

How can I concatenate the rows in a pyspark dataframe with multiple columns using groupby and aggregate

How to drop duplicates while using write.partitionBy in a pyspark dataframe?

Google Sheets - How to sum columns in a query formula based off different drop down filters

How to drop rows based on year on a based dataframe in a function using *args

Pyspark eval or expr - Concatenating multiple dataframe columns using when statement

Adding multiple columns in temp table from dataframe using pyspark

pyspark dataframe filtering on multiple columns

pyspark dataframe limiting on multiple columns

pySpark join dataframe on multiple columns

repartitioning by multiple columns for Pyspark dataframe

How to convert PySpark dataframe columns into list of dictionary based on groupBy column

How to filter multiple rows based on rows and columns condition in pyspark