I have storage container where data is in multiple folders with date appended at the end. (below)
"dbfs:/mnt/input/raw/extract/pro_2023-01-01/parquet files here"
"dbfs:/mnt/input/raw/extract/pro_2023-01-02/parquet files here"
"dbfs:/mnt/input/raw/extract/pro_2023-01-03/parquet files here"
"dbfs:/mnt/input/raw/extract/pro_2023-01-04/parquet files here"
"dbfs:/mnt/input/raw/extract/pro_2023-01-05/parquet files here"
It works fine with good performance if I try to read the data from one folder. example:
df = spark.read.parquet("dbfs:/mnt/input/raw/extract/pro_2023-01-05/")
But I have situation when I need to load the data for multiple days into dataframe (mostly weekly basis). For that I pull all data and then use temp view to filter based on folderLoadDate which one of the column in parquet files. In that case, it works but takes forever to run while it scans all the folder and do transformations, I think. Example:
df = spark.read.parquet("dbfs:/mnt/input/raw/extract/pro_2023-*/")
df = there are few transformations and new columns added to df before I create temp view below
df.createOrReplaceTempView ("Alldata")
then run spark sql
%sql select * from Alldata where cast(FolderDate as date) BETWEEN '2023-01-01' AND '2023-01-07'
Is there a way I can pull the data for needed dates into df at very first step? something like
df = spark.read.parquet("dbfs:/mnt/input/raw/extract/BETWEEN(pro_2023-2023-01-01 and pro_2023-01-07")
Any help is appreciated..
Try by using regular expression
for this case.
To read from 01-07 dates use 0[1-7] and spark will read 01,02,03,04,05,06,07 dates into dataframe.
Example:
spark.read.parquet("dbfs:/mnt/input/raw/extract/pro_2023-01-0[1-7]/")
UPDATE:
Define the empty dataframe
and unionAll
the data with the empty dataframe in every iteration.
#define empty dataframe with the schema of parquet files
schema = StructType([
StructField("k", StringType(), True), StructField("v", IntegerType(), False)
])
df = spark.createDataFrame([], schema)
list_directories = ['2021-02-03']
for i in list_directories:
df1 = spark.read.parquet('')
df = df.unionAll(df1)
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments