使用Spark结构化流(pyspark)从Kafka Connect JSONConverter消息中提取“有效负载”(模式和有效负载)

我要完成的正是这个问题(在这里)。就我而言,我使用的是Python / Pyspark Not Scala。

我正在尝试提取Kafka连接消息的“有效负载”部分,其中也包括架构。

样本消息:

{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}

步骤1-为“有效载荷”部分定义模式:

payload_schema = StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)])

第2步-从卡夫卡读:

df =spark.readStream.format("kafka")

第3步-从Kafka消息获取消息值:

kafka_df = df.selectExpr("CAST(value AS STRING)")

第4步-仅提取“有效载荷”(我被困在这里):

    import pyspark.sql.functions as psf

    emp_df = kafka_df\
    .select(psf.from_json(psf.col('value'), payload_schema).alias("DF"))\
    .select("DF.*")

我被困在这部分中,因为我无法弄清楚如何在将有效载荷传递给from_json()函数之前从JSON字符串中提取有效载荷。

注意:我知道我需要先定义整个消息的完整模式,然后才能在from_json()中使用它。我正在尝试仅获取“ payload” json字符串部分。

麦克风

您可以使用SQL函数get_json_object

import pyspark.sql.functions as psf

kafka_df
  .select(psf.get_json_object(kafka_df['value'],"$.payload").alias('payload'))
  .select(psf.from_json(psf.col('payload'), payload_schema).alias("DF"))
  .select("DF.*")

或者,您需要先为整个消息定义完整的架构,然后才能在中使用它from_json

这意味着您的架构应如下所示:

full_schema = StructType([
  StructField("schema", StructType([
    StructField("type", StringType(), False),
    StructField("name", StringType(), False),
    StructField("fields", StructType([
      StructField("field", StringType(), False),
      StructField("type", StringType(), False)
    ]),
  StructField("payload", StructType([
    StructField("emp_id", StringType(), False),
    StructField("emp_name", StringType(), True),
    StructField("city", StringType(), True),
    StructField("emp_sal", StringType(), True),
    StructField("manager_name", StringType(), True)
  ])
])

请仔细检查此架构定义,因为我不太确定如何在Python架构中定义数组,但我希望这个想法很明确。

完成后,您可以通过以下方式选择有效负载字段:

import pyspark.sql.functions as psf

    emp_df = kafka_df\
    .select(psf.from_json(psf.col('value'), full_schema).alias("DF"))\
    .select("DF.payload.*")

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章