我目前正在处理NiFi流程,该流程需要实现自定义处理器,才能在csv记录上应用转换。
我在执行的一些基准测试中注意到了这种现象:如果仅将一个线程分配给每个自定义处理器,则一切运行良好。由于java.lang.NullPointerException,为自定义处理器分配更多线程会导致处理会话失败。
由于无法用单个线程重现该错误,因此我在考虑处理流文件时遇到的一些问题,或者我不知道的NiFi某些方面。
通过访问流文件属性来执行处理。流文件的内容从不读取,添加一些属性后将返回输出流文件。以下是处理器代码相关部分的摘要:
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// Will hold all the processed attributes
Map<String, String> processedAttributes = new HashMap<>();
FlowFile flowfile = session.get();
...
// Adds the attributes to the flowfile
flowfile = session.putAllAttributes(flowfile, processedAttributes);
session.transfer(flowfile, PROCESSED);
}
我在m4.4xlarge的Amazon ec2实例上运行NiFi 0.7。由于我正在寻求高性能(没有),因此我正在寻找一种增加线程数的安全方法。任何建议都非常感谢。
先感谢您。
session.get()方法可能对某些线程返回null(尤其是在有多个线程/并发任务的情况下)。如果由于有可用的流文件而调度了两个或多个线程,则会发生这种情况,然后一个线程将获取流文件(通过session.get()),而下一个线程将为空。
就您而言,您可能不是在读取或写入流文件,而是通过其他方法(例如session.putAllAttributes())调用流文件上的方法。许多处理器添加了检查以查看flowfile == null是否会返回(因为它希望对流文件进行操作),也许这也可以解决您的问题。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句