Compare rows of two dataframes to find the matching column count of 1's

KG_1

I have 2 dataframes with same schema, i need to compare the rows of dataframes and keep a count of rows with at-least one column with value 1 in both the dataframes

Right now i am making a list of the rows and then comparing the 2 lists to find even if one value is equal in both the list and equal to 1

rowOgList = []
for row in cat_og_df.rdd.toLocalIterator():
    rowOgDict = {}
    for cat in categories:
        rowOgDict[cat] = row[cat]
    rowOgList.append(rowOgDict)

#print(rowOgList[0])

rowPredList = []
for row in prob_df.rdd.toLocalIterator():
    rowPredDict = {}
    for cat in categories:
        rowPredDict[cat] = row[cat]
    rowPredList.append(rowPredDict)

But here the function rdd.tolocalIterator gives me a heap space error when i try it on a huge dataset. for example: this is the 1st dataframe

+-------+-------+-------+-------+
|column1|column2|column3|column4|
+-------+-------+-------+-------+
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      1|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      1|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      1|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      1|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      1|      0|      0|      0|
|      0|      0|      1|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
+-------+-------+-------+-------+

this is the 2nd dataframe

+-------+-------+-------+-------+
|column1|column2|column3|column4|
+-------+-------+-------+-------+
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      0|      0|      1|      1|
|      0|      0|      1|      1|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      0|      0|      1|      1|
|      1|      0|      1|      0|
|      0|      0|      1|      1|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
+-------+-------+-------+-------+

here rows 9,11,17,18 have at least one column with same value and that value as 1 so here the count = 4

Can this be done in any optimized way, Thanks.

cylim

Note : As mentioned by pault, this will work better if you have unique row indices that connect both dataframes. Otherwise, the row orders may not be guaranteed in some Spark operations.

(1) Setup the environment and some sample data.

import numpy as np

from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F

df1 = spark.createDataFrame([
    (0, 0, 1),
    (1, 0, 0),
    (0, 0, 1)
], ["column1", "column2", "column3"])

df2 = spark.createDataFrame([
    (0, 0, 0),
    (1, 0, 1),
    (0, 0, 1)
], ["column1", "column2", "column3"])

(2) Collect all columns into a Spark vector.

assembler = VectorAssembler(
    inputCols=["column1", "column2", "column3"],
    outputCol="merged_col")

df1_merged = assembler.transform(df1)
df2_merged = assembler.transform(df2)
df1_merged.show()

+-------+-------+-------+-------------+
|column1|column2|column3|   merged_col|
+-------+-------+-------+-------------+
|      0|      0|      1|[0.0,0.0,1.0]|
|      1|      0|      0|[1.0,0.0,0.0]|
|      0|      0|      1|[0.0,0.0,1.0]|
+-------+-------+-------+-------------+

(3) Get the row and column index of non-zero elements. Using numpy.nonzero() on RDD of Spark Vector.

def get_nonzero_index(args):
    (row, index) = args
    np_arr = np.array(row.merged_col)
    return (index, np_arr.nonzero()[0].tolist())

df1_ind_rdd = df1_merged.rdd.zipWithIndex().map(get_nonzero_index)
df2_ind_rdd = df2_merged.rdd.zipWithIndex().map(get_nonzero_index)
df1_ind_rdd.collect()
[(0, [2]), (1, [0]), (2, [2])]

df2_ind_rdd.collect()
[(0, []), (1, [0, 2]), (2, [2])]

(4) You can then do your comparison on these 2 Python lists easily.

Note that this method will not be efficient (due to collect) if the number of rows you have is very large. In that case, you will want to do all processing in Spark by doing a join on the 2 dataframes.

(5) To do the matching purely in Spark, you can try the methods below that rely on a join on row index.

df1_index = spark.createDataFrame(df1_ind_rdd, ["row_index_1", "column_index_1"])
df2_index = spark.createDataFrame(df2_ind_rdd, ["row_index_2", "column_index_2"])

df_joined = df1_index.join(df2_index, df1_index.row_index_1 == df2_index.row_index_2)

Then expand the list so that we get an element on each row.

df_exploded = df_joined.withColumn("column_index_exp_1", F.explode(df_joined.column_index_1))\
                            .withColumn("column_index_exp_2", F.explode(df_joined.column_index_2))

Check for match between the two columns and finally convert into integer for summing.

df_match_bool = df_exploded.withColumn("match_bool", df_exploded.column_index_exp_1 == df_exploded.column_index_exp_2)

df_match_int = df_match_bool.withColumn("match_integer", df_match_bool.match_bool.cast("long"))
df_match_bool.show()
+-----------+--------------+-----------+--------------+------------------+------------------+----------+
|row_index_1|column_index_1|row_index_2|column_index_2|column_index_exp_1|column_index_exp_2|match_bool|
+-----------+--------------+-----------+--------------+------------------+------------------+----------+
|          1|           [0]|          1|        [0, 2]|                 0|                 0|      true|
|          1|           [0]|          1|        [0, 2]|                 0|                 2|     false|
|          2|           [2]|          2|           [2]|                 2|                 2|      true|
+-----------+--------------+-----------+--------------+------------------+------------------+----------+

df_match_int.groupBy().sum("match_integer").collect()[0][0]
2

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Compare 2 dataframes, find matching rows, then sum a column

Compare two dataframes based on two numeric columns and find rows matching within a numeric threshold of eachother

Spark Compare two dataframes and find the match count

Compare two dataframes and remove rows from a df based on a matching column value

Pandas compare 1 columns values to another dataframe column, find matching rows

Find matching rows between two dataframes by just the first word

Find location in first dataframe of matching rows in two dataframes

Pandas: find matching rows in two dataframes (without using `merge`)

find and compare and count matching of pattern in each column in datarame

Compare rows of two dataframes in pandas

Compare Python Pandas DataFrames for matching rows

How to find and count matching data for an ID column in two files?

Postgres 9.3 count rows matching a column relative to row's timestamp

Compare 'order' of rows within 'groups' of two separate dataframes and find the rows that are 'swapped' in order and extract the original indexes

Is there a way to compare columns in two dataframes with varying column numbers and removing rows that don't match a certain column?

Compare values in GroupBy and count the matching rows

Matching rows of two dataframes based on multiple conditions

multiply two dataframes by matching rows and columns

Find and return matching rows across multiple DataFrames

Compare two dataframes and keep the rows that overlap

Python dataframe - compare rows between two dataframes

How to compare the rows of two dataframes in R

Fastest way to compare rows of two pandas dataframes?

Compare two dataframes and only keep certain rows

Compare two files by two column matching

Compare two integer and count matching numbers

Compare 2 Dataframe and Find the Matching Rows

Compare two dataframes and output new column

r compare column types between two dataframes