如何使用Java从Spark中的kafka读取流嵌套的JSON

Gokulraj

我正在尝试使用Java从Spark中读取来自kafka的复杂嵌套JSON数据,并且在形成数据集时遇到问题

发送到kafka的实际JSON文件

{"sample_title": {"txn_date": "2019-01-10","timestamp": "2019-02-01T08:57:18.100Z","txn_type": "TBD","txn_rcvd_time": "01/04/2019 03:32:32.135","txn_ref": "Test","txn_status": "TEST"}}
{"sample_title2": {"txn_date": "2019-01-10","timestamp": "2019-02-01T08:57:18.100Z","txn_type": "TBD","txn_rcvd_time": "01/04/2019 03:32:32.135","txn_ref": "Test","txn_status": "TEST"}}
{"sample_title3": {"txn_date": "2019-01-10","timestamp": "2019-02-01T08:57:18.100Z","txn_type": "TBD","txn_rcvd_time": "01/04/2019 03:32:32.135","txn_ref": "Test","txn_status": "TEST"}}
Dataset<Row> df = spark.readStream().format("kafka")
                    .option("spark.local.dir", config.getString(PropertyKeys.SPARK_APPLICATION_TEMP_LOCATION.getCode()))
                    .option("kafka.bootstrap.servers",
                            config.getString(PropertyKeys.KAFKA_BOORTSTRAP_SERVERS.getCode()))
                    .option("subscribe", config.getString(PropertyKeys.KAFKA_TOPIC_IPE_STP.getCode()))
                    .option("startingOffsets", "earliest")
                    .option("spark.default.parallelism",
                            config.getInt(PropertyKeys.SPARK_APPLICATION_DEFAULT_PARALLELISM_VALUE.getCode()))
                    .option("spark.sql.shuffle.partitions",
                            config.getInt(PropertyKeys.SPARK_APPLICATION_SHUFFLE_PARTITIONS_COUNT.getCode()))
                    .option("kafka.security.protocol", config.getString(PropertyKeys.SECURITY_PROTOCOL.getCode()))
                    .option("kafka.ssl.truststore.location",
                            config.getString(PropertyKeys.SSL_TRUSTSTORE_LOCATION.getCode()))
                    .option("kafka.ssl.truststore.password",
                            config.getString(PropertyKeys.SSL_TRUSTSTORE_PASSWORD.getCode()))
                    .option("kafka.ssl.keystore.location",
                            config.getString(PropertyKeys.SSL_KEYSTORE_LOCATION.getCode()))
                    .option("kafka.ssl.keystore.password",
                            config.getString(PropertyKeys.SSL_KEYSTORE_PASSWORD.getCode()))
                    .option("kafka.ssl.key.password", config.getString(PropertyKeys.SSL_KEY_PASSWORD.getCode())).load()
                    .selectExpr("CAST(key AS STRING)",
                            "CAST(value AS STRING)",
                            "topic as topic",
                            "partition as partition","offset as offset",
                            "timestamp as timestamp",
                            "timestampType as timestampType");

val output =  df.selectExpr("CAST(value AS STRING)").as(Encoders.STRING()).filter(x -> x.contains("sample_title"));

由于我可以在输入中包含多个架构,因此代码应该能够处理该架构并根据标题进行过滤,并映射到Title类型的Dataset

public class Title implements Serializable {
    String txn_date;
    Timestamp timestamp;
    String txn_type;
    String txn_rcvd_time;
    String txn_ref;
    String txn_status;
}
部分

首先使class Title为Java bean类,即编写getter和setter。

    public class Title implements Serializable {
        String txn_date;
        Timestamp timestamp;
        String txn_type;
        String txn_rcvd_time;
        String txn_ref;
        String txn_status;
        public Title(String data){... //set values for fields with the data}
        // add all getters and setters for fields
    }

    Dataset<Title> resultdf = df.selectExpr("CAST(value AS STRING)").map(value -> new Title(value), Encoders.bean(Title.class))
resultdf.filter(title -> // apply any predicate on title)

如果您要先过滤数据然后再应用编码,

    df.selectExpr("CAST(value AS STRING)")
.filter(get_json_object(col("value"), "$.sample_title").isNotNull)
// for simple filter use, .filter(t-> t.contains("sample_title"))
.map(value -> new Title(value), Encoders.bean(Title.class))

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

通过 Spark 数据帧读取/写入 Kafka 时如何在嵌套 JSON 中添加字段

如何使用结构化流从Kafka中读取JSON格式的记录?

如何在Spark Scala中读取多行嵌套json

如何从Spark结构化流中的特定Kafka分区读取

使用Spark Streaming从Kafka读取流并为其分配模式

如何使用 Java 流计算列表中的嵌套集合

如何在Spark中读取嵌套集合

如何在Java中读取嵌套的JSON列表?

如何在 Java 中读取嵌套的非数组 JSON?

如何读取大嵌套的 Json 并使用 Java 获取重复的键值

使用Java(Jackson)读取JSON中的嵌套键的值

我如何使用AESON读取嵌套JSON中的数组

如何使用Java Spark结构化流从Kafka主题正确使用

如何在Kafka connect 0.10和Spark结构化流中使用from_json?

如何使用Perl读取https JSON流?

如何从Spring云流中读取Kafka消息密钥?

如何使用Scala在Spark中读取json文件?

使用Java 8读取嵌套的JSON元素

如何使用Apex从嵌套JSON读取值?

如何在Spark 3.0结构化流中使用kafka.group.id和检查点以继续从重启后停止的Kafka读取?

如何使用Java更新JSON文件中的嵌套JSON对象?

如何在流查询(Java)中将Kafka记录作为JSON数组使用?

使用Spark结构化流从Kafka读取数据,总是会发生超时问题

使用Spark结构化流技术读取带有模式的Kafka Connect JSONConverter消息

Spark Structed Streaming从kafka读取嵌套的json并将其展平

如何读取嵌套的 avro 字段以创建流?

如何从Apache Nifi的kafka主题中生成Avro消息,然后使用kafka流读取它?

如何从Kafka读取XML格式的流数据?

如何使用 Java 流來創建在嵌套集合的所有元素中迭代的數組?