Apache Beam中DoFn的线程同步

Kuan Lin

我正在写一个可以在该方法中更改DoFn其实例变量elements(即共享资源)的@ProcessElement方法:

import java.util.ArrayList;
import java.util.List;

import org.apache.beam.sdk.transforms.DoFn;

public class DemoDoFn extends DoFn<String, Void> {
  private final int batchSize;

  private transient List<String> elements;

  public DemoDoFn(int batchSize) {
    this.batchSize = batchSize;
  }

  @StartBundle
  public void startBundle() {
    elements = new ArrayList<>();
  }

  @ProcessElement
  public void processElement(@Element String element, ProcessContext context) {
    elements.add(element); // <-------- mutated

    if (elements.size() >= batchSize) {
      flushBatch();
    }
  }

  @FinishBundle
  public void finishBundle() {
    flushBatch();
  }

  private void flushBatch() {
    // Flush all elements, e.g., send all elements in a single API call to a server

    // Initialize a new array list for next batch
    elements = new ArrayList<>(); // <-------- mutated
  }
}

问题1:是否需要synchronized@ProcessElement方法中添加关键字以避免竞争情况?

根据Apache Beam Thread-compatibility:“除非您显式创建自己的线程,否则一次在工作实例上的单个线程都可以访问一个函数(DoFn)对象的每个实例。但是请注意,Beam SDK并非线程安全。如果您在用户代码中创建自己的线程,则必须提供自己的同步。”

问题2:“功能对象的每个实例一次可以在工作实例上被单个线程访问”是否表示Beam将会同步@ProcessElement或整个DoFn在后台进行?

IBM文指出,和我的报价

  1. “第三,Beam编程指南保证每个用户定义的函数实例一次只能由一个线程执行。这意味着运行程序必须同步整个函数调用,这可能会导致严重的性能瓶颈。”
  2. “ Beam向应用程序保证一次将只有一个线程执行其用户定义的功能。因此,如果下划线引擎产生了多个线程,则运行程序必须同步整个DoFn或GroupByKey调用。”
  3. “由于Beam禁止多个线程进入同一PTransform实例,因此引擎失去了使用运算符并行性的机会。”

该文件似乎表明整个DoFn调用是同步的。

一树

我知道这是一个老问题,但是由于我正在研究同一件事-不,您不需要为您的processElement进行同步,因为如您所言:“您的函数(DoFn)对象的每个实例一次都可以被单个线程访问在一个工人实例上”

这是Beam的官方类的实例,该实例会更改实例变量https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io /elasticsearch/ElasticsearchIO.java#L1369

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章