我正在写一个可以在该方法中更改DoFn
其实例变量elements
(即共享资源)的@ProcessElement
方法:
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.transforms.DoFn;
public class DemoDoFn extends DoFn<String, Void> {
private final int batchSize;
private transient List<String> elements;
public DemoDoFn(int batchSize) {
this.batchSize = batchSize;
}
@StartBundle
public void startBundle() {
elements = new ArrayList<>();
}
@ProcessElement
public void processElement(@Element String element, ProcessContext context) {
elements.add(element); // <-------- mutated
if (elements.size() >= batchSize) {
flushBatch();
}
}
@FinishBundle
public void finishBundle() {
flushBatch();
}
private void flushBatch() {
// Flush all elements, e.g., send all elements in a single API call to a server
// Initialize a new array list for next batch
elements = new ArrayList<>(); // <-------- mutated
}
}
问题1:是否需要synchronized
在@ProcessElement
方法中添加关键字以避免竞争情况?
根据Apache Beam Thread-compatibility:“除非您显式创建自己的线程,否则一次在工作实例上的单个线程都可以访问一个函数(DoFn)对象的每个实例。但是请注意,Beam SDK并非线程安全。如果您在用户代码中创建自己的线程,则必须提供自己的同步。”
问题2:“功能对象的每个实例一次可以在工作实例上被单个线程访问”是否表示Beam将会同步@ProcessElement
或整个DoFn在后台进行?
本IBM文指出,和我的报价
该文件似乎表明整个DoFn调用是同步的。
我知道这是一个老问题,但是由于我正在研究同一件事-不,您不需要为您的processElement进行同步,因为如您所言:“您的函数(DoFn)对象的每个实例一次都可以被单个线程访问在一个工人实例上”
这是Beam的官方类的实例,该实例会更改实例变量https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io /elasticsearch/ElasticsearchIO.java#L1369
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句