从Spark结构化流以JSON数组形式写入数据

库沙格拉·米塔尔

我必须将Spark Structure流中的数据写为JSON Array,我尝试使用以下代码:

df.selectExpr("to_json(struct(*)) AS value").toJSON

它返回我DataSet [String],但无法写为JSON Array。

电流输出:

{"name":"test","id":"id"}
{"name":"test1","id":"id1"}

预期产量:

[{"name":"test","id":"id"},{"name":"test1","id":"id1"}]
麦克风

您可以为此使用SQL内置函数collect_list此函数收集并返回一组非唯一元素(相比之下,collect_set该元素仅返回唯一元素)。

collect_list的源代码中,您将看到这是一个聚合函数。根据《结构化流输出模式编程指南》中有关输出模式的要求,强调了不带水印的聚合支持输出模式“完整”和“更新”。

在此处输入图片说明

根据您的评论,我不希望添加水印和新列。另外,您面临的错误

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark; 

提醒您不要使用输出模式“追加”。

在评论中,您提到计划将结果生成为Kafka消息。一个大JSON数组作为一个Kafka值。完整的代码看起来像

val df = spark.readStream
  .[...] // in my test I am reading from Kafka source
  .load()
  .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "offset", "partition")
  // do not forget to convert you data into a String before writing to Kafka
  .selectExpr("CAST(collect_list(to_json(struct(*))) AS STRING) AS value")

df.writeStream
  .format("kafka")
  .outputMode("complete")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "test")
  .option("checkpointLocation", "/path/to/sparkCheckpoint")
  .trigger(Trigger.ProcessingTime(10000))
  .start()
  .awaitTermination()

给定键/值对(k1,v1),(k2,v2)和(k3,v3)作为输入,您将在Kafka主题中获得一个值,该值包含所有选定数据作为JSON数组:

[{"key":"k1","value":"v1","offset":7,"partition":0}, {"key":"k2","value":"v2","offset":8,"partition":0}, {"key":"k3","value":"v3","offset":9,"partition":0}]

已在Spark 3.0.1和Kafka 2.5.0中进行测试。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章