我想知道是否可以将一系列PySpark命令打包到一个函数中,以使该函数获取一个数据帧并将其应用于数据帧。我们在Python中所做的事情。
例如,我有以下数据框:
sevents_df.show(5)
+-------+--------+-------------+----------------+------------+-----+
|Counter|Duration|StartTime |TypeEnumeration |Floor_Number|Value|
+-------+--------+-------------+----------------+------------+-----+
| 1.0| 5460|1503067077370|UC_001 | NaN| NaN|
| 1.0| 322|1503067090480|UC_008 | NaN| NaN|
| 1.0| 990|1503067099300|UC_001 | NaN| NaN|
| 1.0| 5040|1503067396060|UC_001 | NaN| NaN|
| 1.0| 6090|1503067402150|UC_001 | NaN| NaN|
+-------+--------+-------------+----------------+------------+-----+
步骤1.我要做的第一件事是过滤出类型。我只是保持UC_001
。
sevents_filter = sevents_df.filter(sevents_df['TypeEnumeration'].isin(['UC_001']) == True)
步骤2.删除一些列:
columns_to_drop = ['Comments', 'Floor_Number', 'Value']
sevents_clean = sevents_filter.drop(*columns_to_drop)
步骤3.转换StartTime
为日期
def convert_to_seconds(x):
return x/1000
udf_myFunction = udf(convert_to_seconds, IntegerType())
sevents2 = sevents2.withColumn("StartTime", udf_myFunction("StartTime"))
sevents4 = sevents2.withColumn('epoch',
f.date_format(sevents2.StartTime.cast(dataType=t.TimestampType()),"yyyy-MM-dd"))
我想将这三个步骤放在类似的函数中:
some udf pySpark_function(dataframe):
step 1
step 2
step 3
我要这样做的原因是,如果我有N
数据帧,我将无法想象编写这些步骤的N
次数。
一种解决方案是将这些N
帧连接为一帧,然后将此宏帧一次通过这些步骤。一次传递一帧是否有其他选择?
AnUDF
用于处理数据框列中的值,而不能用于处理整个数据框。而是,创建一个采用数据框并返回处理后的数据框的普通方法。
def process_df(df):
df = df.filter(df['TypeEnumeration'] == 'UC_001')
columns_to_drop = ['Comments', 'Floor_Number', 'Value']
df = df.drop(*columns_to_drop)
df = df.withColumn('epoch', f.date_format((df.StartTime / 1000).cast(t.TimestampType()), "yyyy-MM-dd"))
return df
然后,只需遍历所有数据帧并使用上述方法即可。
注意:我对代码做了一些简化。不需要,isin
因为您只过滤单个值,UDF
而不必除以1000。在可能的情况下,最好使用内置的Spark函数而不是自定义a UDF
,这样会更快。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句