我正在使用pyspark从Kafka获取数据并将其插入cassandra。我快到了,我只需要最后一步。
def Spark_Kafka_Receiver():
# STEP 1 OK!
dc = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "000.00.0.240:9092") \
.option("subscribe", "MyTopic") \
.load()
dc.selectExpr("CAST(key as STRING)", "CAST(value AS STRING) as msg")
# STEP 2 OK!
dc.writeStream \
.outputMode("append") \
.foreachBatch(foreach_batch_function) \
.start() \
.awaitTermination()
# STEP 3 NEED HELP
def foreach_batch_function(df, epoch_id):
Value = df.select(df.value)
???????
# WRITE DATA FRAME ON CASSANDRA
df.write \
.format("org.apache.spark.sql.cassandra") \
.mode('append') \
.options(table=table_name, keyspace=keyspace) \
.save()
所以我有这样的格式我的价值:
DataFrame [值:二进制]
我需要插入一些东西来打开“我的价值”,将二进制文件放入其中,并创建一个具有正确格式的不错的数据框,以处理数据库并执行代码的最后一部分。
您不再需要使用foreachBatch
。您只需要升级到本机支持Spark结构化流的Spark Cassandra Connector 2.5,因此您可以编写:
dc.writeStream \
.format("org.apache.spark.sql.cassandra") \
.mode('append') \
.options(table=table_name, keyspace=keyspace)
.start() \
.awaitTermination()
关于问题的第二部分-如果要将值转换为多列,则需要使用from_json
函数,将模式传递给它。这是Scala中的示例,但是Python代码应该非常相似:
val schemaStr = "id:int, value:string"
val schema = StructType.fromDDL(schemaStr)
val data = dc.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", schema).as("data"))
.select("data.*").drop("data")
然后您可以通过 writeStream
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句