如何在一个微批处理中设置最大行数?

利奇

我正在foreachBatch通过以下代码使用火花结构化流从redis读取批处理记录(尝试通过设置batchSize stream.read.batch.size

val data = spark.readStream.format("redis")
  .option("stream.read.batch.size").load()

val query = data.writeStream.foreachBatch { 
  (batchDF: DataFrame, batchId: Long) => ...
  // we count size of batchDF here, we want to limit its size
  // some operation
}

目前,我们将其设置stream.read.batch.size为128,但似乎不起作用。batchSize似乎是随机的,有时超过1000甚至10000。

但是,我不想等待那么长时间(10000条记录),因为我// some operation需要尽快进行一些操作(在代码注释中),以便我希望控制最大批处理大小,以便当记录达到此限制时可以立即处理,该怎么办?

fe2s

我是spark-redis的维护者。当前不支持此功能。stream.read.batch.size参数控制单个Redis API调用(countXREADGROUP调用的参数)读取的项目数它不会影响每个触发器的项目数(batchDF大小)。我已经在github上为该功能请求打开了一张票。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何在Cat命令中设置最大行数

如何在 flexbox 容器中设置最大行数并隐藏额外元素?

如何在另一个批处理文件中调用一个批处理文件?

如何在openpyxl库中获取最大行数?

如何设置Text或RichText的最大行数?

设置图例中的最大行数

从另一个批处理文件调用一个批处理,并从外部批处理中设置内部批处理文件参数

如何在PyCharm中设置最大行长?

如何在vscode中为python设置最大行长?

如何在批处理文件中自动运行下一个命令?

echo off 和 on 如何在同一个批处理脚本中执行?

如何从另一个批处理脚本中杀死正在执行的批处理脚本?

CSV Python中可以处理的最大行数?

如何从txt文件中抓取一个随机单词并将其设置为批处理中的变量

如何在一个函数中设置两个消息处理程序

如何在SQL中获取按ID分组的最大行数

如何在不使用 SQL 代码中的 groupby 的情况下选择最大行数?

如何在spring批处理中的另一个流中定义并行子流?

如何在批处理脚本中在文件名的最后 6 位之前附加一个符号?

如何在批处理文件中传递来自另一个 perl 脚本的输入

如何在一个命令中打开cmd并在其中运行批处理文件

批处理:在变量中设置一个目录向上路径

如何在VB.NET中应用并行处理以执行保存在一个文件夹中的130个批处理文件?

如何控制MYSQL中每个记录的最大行数?

如何通过 SQL 语法获取分区中的最大行数?

Cakephp 3:如何从表中获取最大行数

如何在 nodejs 中实现批处理功能,其中第一个函数的输出将输入到下一个函数

熊猫:设置编号。最大行数

如何从另一个批处理文件在批处理文件中写入管道(|)?