Apache NiFi-NullPointerException在自定义处理器上设置多个线程

里卡米尼

我目前正在处理NiFi流程,该流程需要实现自定义处理器,才能在csv记录上应用转换。

我在执行的一些基准测试中注意到了这种现象:如果仅将一个线程分配给每个自定义处理器,则一切运行良好。由于java.lang.NullPointerException,为自定义处理器分配更多线程会导致处理会话失败。

由于无法用单个线程重现该错误,因此我在考虑处理流文件时遇到的一些问题,或者我不知道的NiFi某些方面。

通过访问流文件属性来执行处理。流文件的内容从不读取,添加一些属性后将返回输出流文件。以下是处理器代码相关部分的摘要:

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

    //  Will hold all the processed attributes
    Map<String, String> processedAttributes = new HashMap<>();
    FlowFile flowfile = session.get();
    ...
    //  Adds the attributes to the flowfile
    flowfile = session.putAllAttributes(flowfile, processedAttributes);
    session.transfer(flowfile, PROCESSED);
    }

我在m4.4xlarge的Amazon ec2实例上运行NiFi 0.7。由于我正在寻求高性能(没有),因此我正在寻找一种增加线程数的安全方法。任何建议都非常感谢。

先感谢您。

马蒂卜

session.get()方法可能对某些线程返回null(尤其是在有多个线程/并发任务的情况下)。如果由于有可用的流文件而调度了两个或多个线程,则会发生这种情况,然后一个线程将获取流文件(通过session.get()),而下一个线程将为空。

就您而言,您可能不是在读取或写入流文件,而是通过其他方法(例如session.putAllAttributes())调用流文件上的方法。许多处理器添加了检查以查看flowfile == null是否会返回(因为它希望对流文件​​进行操作),也许这也可以解决您的问题。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

创建自定义处理器 apache nifi

SNMP陷阱接收器作为Apache Nifi中的自定义处理器

是否可以调试Apache Nifi自定义处理器?

Apache NiFi:使用ExecuteScript处理器处理多个csv

TailFile 处理器 - Apache Nifi

Apache NIFI 自定义处理器给出错误“找不到合适的驱动程序”

GetFile处理器在Apache NiFi中持续运行

来自1个表Apache NIFI的Gethbase处理器

如何在 Apache Nifi 中实现 ExtractCCDAAttributes 处理器?

NiFi自定义处理器表达语言

onTrigger 的工作 - nifi 自定义处理器

Apache NIFi MergeContent处理器-将分界器设置为新行

Apache Nifi:从Java应用程序执行/触发Nifi处理器

Apache NiFi-使用多个FlowFile作为处理器的输入

Apache NiFi - 自定义目录上的 PutGCSObject

用于 Apache NiFi 的自定义 docker 容器

在 apache nifi 中批量处理流文件

动态设置内容类型的Apache NiFi

如何在https上配置Apache nifi

在Apache NiFi中定义Apache Avro Schema全名

我们能否以及如何将apache nifi流文件及其属性保存到磁盘,以便我们可以使用/重新读取它们以用于自定义处理器单元测试用例

使用 Apache NiFi PutDistributedMapCache 在 Redis 上设置 TTL

Apache NiFi中的处理器属性和Flowfile属性之间的差异

Apache NiFi-OutOfMemory错误:SplitText处理器超出了GC开销限制

Apache Nifi:对 ExecuteStreamCommand 处理器产生的输出是否有限制

使用groovy在apache-nifi中使用executescript处理器更新csv值失败

Apache Nifi 1.7替换文本处理器进行时间戳操作

如何使用 Apache NiFi 标准处理器进行数据转换?

Apache NiFi EvaluationJSONPath处理器:用于连接2个属性的JSONPath表达式