How to pivot a DataFrame in PySpark on multiple columns?

Stefan Falk

I am having a data set in the following format:

import numpy as np
import pandas as pd

# Create the data set

np.random.seed(42)

records = list()
for i in range(2):
    for j in range(2):        
        for k in range(500):
            t = np.random.randint(pd.Timestamp('2000-01-01').value, pd.Timestamp('2018-01-01').value)
            if np.random.rand() > .95: continue                
            ts = pd.Timestamp(t).strftime('%Y-%m-%d %H:%M:%S.%f')
            records.append( (i, j, np.random.rand(), ts) )

df = pd.DataFrame.from_records(records)
df.columns =['a_id', 'b_id', 'value', 'time']

Which looks like this:

      a_id  b_id     value                        time
0        0     0  0.156019  2007-09-28 15:12:24.260596
1        0     0  0.601115  2015-09-08 01:59:18.043399
2        0     0  0.969910  2012-01-10 07:51:29.662492
3        0     0  0.181825  2011-08-28 19:58:33.281289
4        0     0  0.524756  2015-11-15 14:18:17.398715
5        0     0  0.611853  2015-01-07 23:44:37.034322
6        0     0  0.366362  2008-06-21 11:56:10.529679
7        0     0  0.199674  2010-11-08 18:24:18.794838
8        0     0  0.046450  2008-04-27 02:36:46.026876

Here a_id and b_id are the key for a sensor. This means the data frame has to be transformed as such:

df_ = pd.pivot_table(df, index='time', columns=['a_id', 'b_id'], values='value')
df_.index = [pd.to_datetime(v) for v in df_.index]
df_ = df_.resample('1W').mean().ffill().bfill()

After resampling and filling the gaps, the data is in the desired format:

a_id               0                   1          
b_id               0         1         0         1
2000-01-09  0.565028  0.560434  0.920740  0.458825
2000-01-16  0.565028  0.146963  0.920740  0.217588
2000-01-23  0.565028  0.840872  0.920740  0.209690
2000-01-30  0.565028  0.046852  0.920740  0.209690
2000-02-06  0.565028  0.046852  0.704871  0.209690

Each column contains now the data of a sensor.

The problem is, I do not know how to do this in PySpark.

df_test = spark.createDataFrame(df) \
    .withColumn('time', F.to_utc_timestamp('time', '%Y-%m-%d %H:%M:%S.%f'))
df_test.printSchema()

Having

root
 |-- a_id: long (nullable = true)
 |-- b_id: long (nullable = true)
 |-- value: double (nullable = true)
 |-- time: timestamp (nullable = true)

How can I transform df_test such that it has the same form as df_?

eliasah

As mentionned in the comment, here is a solution to pivot your data :

You should concat your columns a_id and b_id under a new column c_id and group by date then pivot on c_id and use values how to see fit.

As for resampling, I'd point you to the solution provided by @zero323 here.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Pivot and Concatenate columns in pyspark dataframe

How to pivot a Pyspark Dataframe

Pyspark Pivot table with Multiple columns

How to explode multiple columns of a dataframe in pyspark

How can I pivot on multiple columns separately in pyspark

Pivot a pandas dataframe with multiple columns

How to pivot multiple columns

pyspark dataframe pivot a json column to new columns

How to pivot a pyspark streaming dataframe

How can I pivot a pandas dataframe (timeseries) with multiple columns at once?

How to make a pivot table from a dataframe with multiple columns?

pivot one column into multiple columns in Pyspark/Python

pyspark dataframe filtering on multiple columns

pyspark dataframe limiting on multiple columns

pySpark join dataframe on multiple columns

repartitioning by multiple columns for Pyspark dataframe

How to add multiple new columns with when condition in pyspark dataframe?

How to drop columns based on multiple filters in a dataframe using PySpark?

How to select and order multiple columns in a Pyspark Dataframe after a join

How can I sum multiple columns in a spark dataframe in pyspark?

How do I use flatmap with multiple columns in Dataframe using Pyspark

Pyspark: How to impute multiple columns in DataFrame in the same action?

How to join pandas dataframe with multiple columns and conditions like pyspark

How to perform pivot_wider on a dataframe with multiple names_from columns and multiple values_from columns?

Explode 2 columns into multiple columns in pyspark dataframe

How to pivot multiple columns in postgres

How to pivot a table with dynamic columns in pyspark

Pivot a Pandas dataframe using multiple columns

Pivot on multiple columns dynamically in Spark Dataframe