我必须将Spark Structure流中的数据写为JSON Array,我尝试使用以下代码:
df.selectExpr("to_json(struct(*)) AS value").toJSON
它返回我DataSet [String],但无法写为JSON Array。
电流输出:
{"name":"test","id":"id"}
{"name":"test1","id":"id1"}
预期产量:
[{"name":"test","id":"id"},{"name":"test1","id":"id1"}]
您可以为此使用SQL内置函数collect_list。此函数收集并返回一组非唯一元素(相比之下,collect_set
该元素仅返回唯一元素)。
从collect_list的源代码中,您将看到这是一个聚合函数。根据《结构化流输出模式编程指南》中有关输出模式的要求,强调了不带水印的聚合支持输出模式“完整”和“更新”。
根据您的评论,我不希望添加水印和新列。另外,您面临的错误
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
提醒您不要使用输出模式“追加”。
在评论中,您提到计划将结果生成为Kafka消息。一个大JSON数组作为一个Kafka值。完整的代码看起来像
val df = spark.readStream
.[...] // in my test I am reading from Kafka source
.load()
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "offset", "partition")
// do not forget to convert you data into a String before writing to Kafka
.selectExpr("CAST(collect_list(to_json(struct(*))) AS STRING) AS value")
df.writeStream
.format("kafka")
.outputMode("complete")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.option("checkpointLocation", "/path/to/sparkCheckpoint")
.trigger(Trigger.ProcessingTime(10000))
.start()
.awaitTermination()
给定键/值对(k1,v1),(k2,v2)和(k3,v3)作为输入,您将在Kafka主题中获得一个值,该值包含所有选定数据作为JSON数组:
[{"key":"k1","value":"v1","offset":7,"partition":0}, {"key":"k2","value":"v2","offset":8,"partition":0}, {"key":"k3","value":"v3","offset":9,"partition":0}]
已在Spark 3.0.1和Kafka 2.5.0中进行测试。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句