加速pyspark解析大型嵌套json文件

僅有的

您好,我嵌套了大小為 400 兆字節的 json 文件和 20 萬條記錄。我創建了一個使用 pyspark 解析文件並存儲在自定義數據幀中的解決方案,但執行此操作大約需要 5-7 分鐘,這非常慢。

這是一個 json 文件的示例(小文件但與大文件具有相同的結構):

{"status":"success",

 "data":{"resultType":"matrix","result":

[{"metric":{"data0":"T" ,"data1":"O"},"values":[[90,"0"],[80, "0"]]},

{"metric":{"data0":"K" ,"data1":"S"},"values":[[70,"0"],[60, "0"]]},

{"metric":{"data2":"J" ,"data3":"O"},"values":[[50,"0"],[40, "0"]]}]}}

這是我想要的輸出數據幀的結構:

time | value |data0 | data1 | data2  | data3 
90   |   "0" |   "T"|    "O"|   nan  | nan
80   |   "0" |   "T"|    "O"|   nan  | nan
70   |   "0" |   "K"|    "S"|   nan  | nan
60   |   "0" |   "K"|    "S"|   nan  | nan
50   |   "0" |   nan|    nan|   "J"  | "O"
40   |   "0" |   nan|    nan|   "J"  | "O"

這是我用來在大文件上生成上面列出的數據幀結構的 pyspark 代碼:

from datetime import datetime
import json
import rapidjson
import pyspark.sql.functions as F
from pyspark.sql.types import StructType
from util import schema ,meta_date

new_schema = StructType.fromJson(json.loads(schema))

with open("largefile.json", "r") as json_file:
    result_count = len(rapidjson.load(json_file)["data"]["result"])

spark = SparkSession.builder.master("spark://IP").getOrCreate()


conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'),
                                        ('spark.executor.cores', '4'),
                                        ('spark.driver.memory', '4g'),
                                         ])

spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()


df = spark.read.json("largefile.json")

for data_name in meta_date:
    df = df.withColumn(
        data_name, F.expr(f"transform(data.result, x -> x.metric.{data_name})")
    )

df = (
    df.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
        .withColumn("items", F.array(*[F.lit(x) for x in range(0, result_count)]))
        .withColumn("items", F.explode(F.col("items")))
)

for data_name in meta_date:
    df = df.withColumn(data_name, F.col(data_name).getItem(F.col("items")))

df = (df.withColumn("values", F.col("values").getItem(F.col("items")))
      .withColumn("values", F.explode("values"))
      .withColumn("time", F.col("values").getItem(0))
      .withColumn("value", F.col("values").getItem(1))
      .drop("data", "status", "items", "values")).show()

我的機器有 4 個內核(8 個邏輯內核)和內存 16 GB 。我正在使用主節點和 2 個工作節點集群的獨立模式。

關於如何通過編輯集群配置或重構代碼中的轉換來加快此過程的任何幫助?

拉曼努斯

那這個呢?閱讀 json,選擇帶有爆炸的列,它看起來與您想要的結果匹配。

df.select(f.explode('data.result').alias('result')) \
  .select('result.metric.*', f.explode('result.values').alias('values')) \
  .withColumn('time', f.col('values')[0]) \
  .withColumn('value', f.col('values')[1]) \
  .drop('values') \
  .show(truncate=False)

+-----+-----+-----+-----+----+-----+
|data0|data1|data2|data3|time|value|
+-----+-----+-----+-----+----+-----+
|T    |O    |null |null |90  |0    |
|T    |O    |null |null |80  |0    |
|K    |S    |null |null |70  |0    |
|K    |S    |null |null |60  |0    |
|null |null |J    |O    |50  |0    |
|null |null |J    |O    |40  |0    |
+-----+-----+-----+-----+----+-----+

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章