如何在Kafka connect 0.10和Spark结构化流中使用from_json?

卡洛斯·罗德里格斯

我试图从[Databricks] [1]重现该示例,并将其应用到Kafka的新连接器并进行火花结构化流式传输,但是我无法使用Spark中的现成方法正确解析JSON ...

注意:该主题以JSON格式写入Kafka。

val ds1 = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", IP + ":9092")
          .option("zookeeper.connect", IP + ":2181")
          .option("subscribe", TOPIC)
          .option("startingOffsets", "earliest")
          .option("max.poll.records", 10)
          .option("failOnDataLoss", false)
          .load()

以下代码将不起作用,我认为这是因为列json是字符串并且与from_json签名方法不匹配...

    val df = ds1.select($"value" cast "string" as "json")
                .select(from_json("json") as "data")
                .select("data.*")

有小费吗?

[更新]示例工作:https : //github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala

阿巴格尔

首先,您需要为JSON消息定义架构。例如

val schema = new StructType()
  .add($"id".string)
  .add($"name".string)

现在,您可以在from_json以下方法中使用此架构

val df = ds1.select($"value" cast "string" as "json")
            .select(from_json($"json", schema) as "data")
            .select("data.*")

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

使用Spark结构化流技术读取带有模式的Kafka Connect JSONConverter消息

如何在Kafka Direct Stream中使用Spark结构化流?

使用Spark结构化流(pyspark)从Kafka Connect JSONConverter消息中提取“有效负载”(模式和有效负载)

如何在Spark 3.0结构化流中使用kafka.group.id和检查点以继续从重启后停止的Kafka读取?

如何在Spark结构化流中使用流数据帧更新静态数据帧

Kafka Connect和流

如何在pyspark结构化流中使用maxOffsetsPerTrigger?

如何在 Spark 中使用 from_json() 数据框?

如何实例化Kafka Connect架构数组

如何通过 OpenID-Connect 使用 Kafka?

使用结构化流处理来自 kafka 的 json 数据

如何使用Java Spark结构化流从Kafka主题正确使用

如何在Kafka Connect Sink中指定Kafka主题的分区

结构化流如何动态解析kafka的json数据

使用Spark结构化流时如何更新Kafka Consumer max.request.size配置

Spark 3 结构化流在 Kafka 源中使用 maxOffsetsPerTrigger 和 Trigger.Once

如何使用结构化流从Kafka中读取JSON格式的记录?

如何在连接到 kafka 集群时在结构化流中禁用“spark.security.credentials.${service}.enabled”

如何在connect中使用expressjs?

如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

如何在Maven pom文件中获取Spark / Kafka org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0依赖关系?

如何在流查询中使用from_json标准功能(在选)?

结构化流Kafka 2.1-> Zeppelin 0.8-> Spark 2.4:spark不使用jar

如何使用(Py)Spark结构化流为带有时间戳(来自Kafka)的JSON记录定义架构?-显示空值

在kafka流中使用kafka connect json api消耗JSON值:JAVA

在Spark结构化流中从kafka / json数据源写入损坏的数据

无法在Spark结构化流中转换Kafka Json数据

如何在YARN中运行Kafka Connect Worker?

如何在 Kafka connect 中升级我的 Debezium 插件版本?