Calculating the cosine similarity between all the rows of a dataframe in pyspark

Abhinav Choudhury

I have a dataset containing workers with their demographic information like age gender,address etc and their work locations. I created an RDD from the dataset and converted it into a DataFrame.

There are multiple entries for each ID. Hence, I created a DataFrame which contained only the ID of the worker and the various office locations' that he/she had worked.

    |----------|----------------|
    | **ID**    **Office_Loc**  |
    |----------|----------------|
    |   1      |Delhi, Mumbai,  |
    |          | Gandhinagar    |
    |---------------------------|
    |   2      | Delhi, Mandi   | 
    |---------------------------|
    |   3      |Hyderbad, Jaipur|
    -----------------------------

I want to calculate the cosine similarity between each worker with every other worker based on their office locations'.

So, I iterated through the rows of the DataFrame, retrieving a single row from the DataFrame :

myIndex = 1
values = (ID_place_df.rdd.zipWithIndex()
            .filter(lambda ((l, v), i): i == myIndex)
            .map(lambda ((l,v), i): (l, v))
            .collect())

and then using map

    cos_weight = ID_place_df.select("ID","office_location").rdd\
  .map(lambda x: get_cosine(values,x[0],x[1]))

to calculated the cosine similarity between the extracted row and the whole DataFrame.

I do not think my approach is a good one since I am iterating through the rows of the DataFrame, it defeats the whole purpose of using spark. Is there a better way to do it in pyspark? Kindly advise.

MaFF

You can use the mllib package to compute the L2 norm of the TF-IDF of every row. Then multiply the table with itself to get the cosine similarity as the dot product of two by two L2norms:

1. RDD

rdd = sc.parallelize([[1, "Delhi, Mumbai, Gandhinagar"],[2, " Delhi, Mandi"], [3, "Hyderbad, Jaipur"]])
  • Compute TF-IDF:

    documents = rdd.map(lambda l: l[1].replace(" ", "").split(","))
    
    from pyspark.mllib.feature import HashingTF, IDF
    hashingTF = HashingTF()
    tf = hashingTF.transform(documents)
    

You can specify the number of features in HashingTF to make the feature matrix smaller (fewer columns).

    tf.cache()
    idf = IDF().fit(tf)
    tfidf = idf.transform(tf)
  • Compute L2norm:

    from pyspark.mllib.feature import Normalizer
    labels = rdd.map(lambda l: l[0])
    features = tfidf
    
    normalizer = Normalizer()
    data = labels.zip(normalizer.transform(features))
    
  • Compute cosine similarity by multiplying the matrix with itself:

    from pyspark.mllib.linalg.distributed import IndexedRowMatrix
    mat = IndexedRowMatrix(data).toBlockMatrix()
    dot = mat.multiply(mat.transpose())
    dot.toLocalMatrix().toArray()
    
        array([[ 0.        ,  0.        ,  0.        ,  0.        ],
               [ 0.        ,  1.        ,  0.10794634,  0.        ],
               [ 0.        ,  0.10794634,  1.        ,  0.        ],
               [ 0.        ,  0.        ,  0.        ,  1.        ]])
    

    OR: Using a Cartesian product and the function dot on numpy arrays:

    data.cartesian(data)\
        .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\
        .sortByKey()\
        .collect()
    
        [((1, 1), 1.0),
         ((1, 2), 0.10794633570596117),
         ((1, 3), 0.0),
         ((2, 1), 0.10794633570596117),
         ((2, 2), 1.0),
         ((2, 3), 0.0),
         ((3, 1), 0.0),
         ((3, 2), 0.0),
         ((3, 3), 1.0)]
    

2. DataFrame

Since you seem to be using dataframes, you can use the spark mlpackage instead:

import pyspark.sql.functions as psf
df = rdd.toDF(["ID", "Office_Loc"])\
    .withColumn("Office_Loc", psf.split(psf.regexp_replace("Office_Loc", " ", ""), ','))
  • Compute TF-IDF:

    from pyspark.ml.feature import HashingTF, IDF
    hashingTF = HashingTF(inputCol="Office_Loc", outputCol="tf")
    tf = hashingTF.transform(df)
    
    idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
    tfidf = idf.transform(tf)
    
  • Compute L2 norm:

    from pyspark.ml.feature import Normalizer
    normalizer = Normalizer(inputCol="feature", outputCol="norm")
    data = normalizer.transform(tfidf)
    
  • Compute matrix product:

    from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
    mat = IndexedRowMatrix(
        data.select("ID", "norm")\
            .rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix()
    dot = mat.multiply(mat.transpose())
    dot.toLocalMatrix().toArray()
    

    OR: using a join and a UDF for function dot:

    dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType())
    data.alias("i").join(data.alias("j"), psf.col("i.ID") < psf.col("j.ID"))\
        .select(
            psf.col("i.ID").alias("i"), 
            psf.col("j.ID").alias("j"), 
            dot_udf("i.norm", "j.norm").alias("dot"))\
        .sort("i", "j")\
        .show()
    
        +---+---+-------------------+
        |  i|  j|                dot|
        +---+---+-------------------+
        |  1|  2|0.10794633570596117|
        |  1|  3|                0.0|
        |  2|  3|                0.0|
        +---+---+-------------------+
    

This tutorial lists different methods to multiply large scale matrices: https://labs.yodas.com/large-scale-matrix-multiplication-with-pyspark-or-how-to-match-two-large-datasets-of-company-1be4b1b2871e

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Calculate Cosine Similarity Spark Dataframe

Calculate cosine similarity between words

Cosine similarity between each row in a Dataframe in Python

Spark cosine distance between rows using Dataframe

How to compute the cosine_similarity in pytorch for all rows in a matrix with respect to all rows in another matrix

Efficient way to compute cosine similarity between 1D array and all rows in a 2D array

How to calculate the cosine similarity of two vectors in PySpark?

Cosine similarity of rows in pandas DataFrame

Cosine similarity between matching rows in numpy ndarrays

A vector and matrix rows cosine similarity in pytorch

Can I use cosine similarity between rows using only non null values?

Cosine similarity between 0 and 1

Difference between cosine similarity and cosine distance

Calculating pairwise Euclidean distance between all the rows of a dataframe

Cosine_similarity between dataframe and subset dataframe

Cosine Similarity rows in a dataframe of pandas

Calculating Cosine Similarity in Julia for K-Means

Calculating cosine similarity

Optimized approach for calculating cosine similarity in SQL Server

similarity between two strings in a Pyspark Dataframe

Transform rows and column and create a similarity dataframe using pyspark

Find cosine similarity between two columns of type array<double> in pyspark

Calculating the cosine similarity with dictionaries

What is the fastest way of calculate cosine similarity between rows of two same shape matrices

Euclidean distance or cosine similarity between columns with vectors in PySpark

Find cosine similarity between different pandas dataframe

Efficient Cosine Similarity between Dataframe Rows

How to find the cosine similarity between 2 dataframe in pandas?

Calculating cosine similarity in Pyspark Dataframe