I am using spark with kafka. I am running a streaming query and reading from a kafka topic. My code is -
package org.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.util.concurrent.TimeoutException;
public class Kafka {
public static void main(String args[]){
SparkSession spark = SparkSession
.builder()
.appName("Spark-Kafka-Integration")
.config("spark.master", "local")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "quickstart-events")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
StreamingQuery query = null;
try {
query = df.writeStream()
.outputMode("append")
.format("console")
.start();
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
throw new RuntimeException(e);
}
}
}
I am entering new events in the topic from the command line using the command -
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
However, the output I am getting on my console looks like this -
I have casted the output as string, still the key and value are not strings. How can I convert them in string ?
You are missing assignment in the expression:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
It should be
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments