How to convert bytes to string while processing records from Kafka?

Anurag

I am using spark with kafka. I am running a streaming query and reading from a kafka topic. My code is -

package org.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.util.concurrent.TimeoutException;

public class Kafka {
    public static void main(String args[]){
        SparkSession spark = SparkSession
                .builder()
                .appName("Spark-Kafka-Integration")
                .config("spark.master", "local")
                .getOrCreate();

        Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "quickstart-events")
                .load();
        df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

        StreamingQuery query = null;
        try {
            query = df.writeStream()
                    .outputMode("append")
                    .format("console")
                    .start();
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            throw new RuntimeException(e);
        }

    }
}

I am entering new events in the topic from the command line using the command -

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

However, the output I am getting on my console looks like this -

Output

I have casted the output as string, still the key and value are not strings. How can I convert them in string ?

Alex Ott

You are missing assignment in the expression:

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

It should be

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related