如何处理从Kafka到Cassandra的pySpark结构化流

SimAzz

我正在使用pyspark从Kafka获取数据并将其插入cassandra。我快到了,我只需要最后一步。

def Spark_Kafka_Receiver():

# STEP 1 OK!

    dc = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "000.00.0.240:9092") \
        .option("subscribe", "MyTopic") \
    .load()
    dc.selectExpr("CAST(key as STRING)", "CAST(value AS STRING) as msg")

# STEP 2 OK!

    dc.writeStream \
        .outputMode("append") \
        .foreachBatch(foreach_batch_function) \
        .start() \
        .awaitTermination()

# STEP 3 NEED HELP

def foreach_batch_function(df, epoch_id):
    Value = df.select(df.value)

    ???????

    # WRITE DATA FRAME ON CASSANDRA
    df.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options(table=table_name, keyspace=keyspace) \
        .save()

所以我有这样的格式我的价值:

DataFrame [值:二进制]

我需要插入一些东西来打开“我的价值”,将二进制文件放入其中,并创建一个具有正确格式的不错的数据框,以处理数据库并执行代码的最后一部分。

亚历克斯·奥特

您不再需要使用foreachBatch您只需要升级到本机支持Spark结构化流的Spark Cassandra Connector 2.5,因此您可以编写:

dc.writeStream \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options(table=table_name, keyspace=keyspace)
        .start() \
        .awaitTermination()

关于问题的第二部分-如果要将值转换为多列,则需要使用from_json函数,将模式传递给它。这是Scala中的示例,但是Python代码应该非常相似:

val schemaStr = "id:int, value:string"
val schema = StructType.fromDDL(schemaStr)
val data = dc.selectExpr("CAST(value AS STRING)")
  .select(from_json($"value", schema).as("data"))
  .select("data.*").drop("data")

然后您可以通过 writeStream

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Spark结构化流如何处理背压?

Spark结构化流作业如何处理流-静态DataFrame连接?

如何处理Spark结构化流中的小文件问题?

状态数据不断增长时,Spark结构化流如何处理内存中状态?

从 Kafka 读取时 Pyspark 结构化流中的异常

使用结构化流处理来自 kafka 的 json 数据

Spark结构化流-Kafka偏移处理

如何在pyspark结构化流中使用maxOffsetsPerTrigger?

如何使用PySpark转换结构化流?

结构化流如何动态解析kafka的json数据

如何在pyspark中的结构化流中从Kafka读取并打印出记录以进行控制台?

多行处理 Spark 结构化流

带有 kafka 的 Spark 结构化流只导致一批(Pyspark)

来自Kafka的pySpark结构化流不会输出到控制台进行调试

PySpark结构化流:将查询的输出传递到API端点

即使从不同的文件格式加载,Vertica如何处理半结构化数据

如何处理Google结构化数据测试工具的无评论

C#中如何处理半结构化的JSON数据

如何使用Java Spark结构化流从Kafka主题正确使用

如何使用结构化流从Kafka中读取JSON格式的记录?

如何使用结构化火花流将镶木地板批量发送到kafka?

如何在Kafka Direct Stream中使用Spark结构化流?

如何在Kafka connect 0.10和Spark结构化流中使用from_json?

使用Spark结构化流时如何更新Kafka Consumer max.request.size配置

如何从Spark结构化流中的特定Kafka分区读取

将Spark结构化的流输出写入Kafka主题

带有结构化流协议的 Apache Kafka

Spark结构化流Kafka集成偏移管理

Spark结构化流+ Kafka集成:MicroBatchExecution PartitionOffsets错误