我正在foreachBatch
通过以下代码使用火花结构化流从redis读取批处理记录(尝试通过设置batchSize stream.read.batch.size
)
val data = spark.readStream.format("redis")
.option("stream.read.batch.size").load()
val query = data.writeStream.foreachBatch {
(batchDF: DataFrame, batchId: Long) => ...
// we count size of batchDF here, we want to limit its size
// some operation
}
目前,我们将其设置stream.read.batch.size
为128,但似乎不起作用。batchSize似乎是随机的,有时超过1000甚至10000。
但是,我不想等待那么长时间(10000条记录),因为我// some operation
需要尽快进行一些操作(在代码注释中),以便我希望控制最大批处理大小,以便当记录达到此限制时可以立即处理,该怎么办?
我是spark-redis的维护者。当前不支持此功能。该stream.read.batch.size
参数控制单个Redis API调用(count
XREADGROUP调用的参数)读取的项目数。它不会影响每个触发器的项目数(batchDF大小)。我已经在github上为该功能请求打开了一张票。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句