単一列_c0のpysparkデータフレームがあります。
a|b|c|clm4=1|clm5=3
a|b|c|clm4=9|clm6=60|clm7=23
このように選択した列のデータフレームに変換しようとしています
clm1,clm2,clm3,clm4,clm6,clm7,clm8
a, b, c, 1, null,null,null
a, b, c, 9, 60, 23, null
clm5を削除し、clm8を追加したことに注意してください。
私は以下のコードを使用しています:
transform_expr = """
transform(split(_c0, '[|]'), (x, i) ->
struct(
IF(x like '%=%', substring_index(x, '=', 1), concat('_c0', i+1)),
substring_index(x, '=', -1)
)
)
"""
df = df.select("_c0", explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")).groupby("_c0").pivot('col_name').agg(first('col_value')).drop("_c0")
問題は、このアクションを実行したい複数の巨大なファイルがあり、各ファイルの結果に同じ列(これも長いリスト)が含まれている必要があり、入力ファイルに存在しない場合はnull値を持つ可能性があることです。上記のコードに条件を追加して、列名のリストに存在する列のみを選択するにはどうすればよいですか?
リストに目的の列を含め、それを使用して変換された配列をフィルタリングできます。
column_list = ["clm1", "clm2", "clm3", "clm4", "clm6", "clm7", "clm8"]
次に、filter
関数を使用して変換ステップの後にこのフィルターを追加します。
column_filter = ','.join(f"'{c}'" for c in column_list)
transform_expr = f"""
filter(transform(split(_c0, '[|]'), (x, i) ->
struct(
IF(x like '%=%', substring_index(x, '=', 1), concat('clm', i+1)) as name,
substring_index(x, '=', -1) as value
)
), x -> x.name in ({column_filter}))
"""
これにより、リストに存在しないすべての列が除外されます。
そして最後に、単純な選択式を使用して、欠落している列をnullとして追加します。
df = df.select("_c0", explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")).groupby("_c0").pivot('col_name').agg(first('col_value')).drop("_c0")
## add missing columns as nulls
final_columns = [col(c).alias(c) if c in df.columns else lit(None).alias(c) for c in column_list]
df.select(*final_columns).show()
#+----+----+----+----+----+----+----+
#|clm1|clm2|clm3|clm4|clm6|clm7|clm8|
#+----+----+----+----+----+----+----+
#| a| b| c| 9| 60| 23|null|
#| a| b| c| 1|null|null|null|
#+----+----+----+----+----+----+----+
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加