我正在编写一个自定义处理器,在其中转换FlowFile的内容。为了简单起见,此问题只写相同的内容。
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) return;
session.write(
flowFile,
(input, output) -> input.transferTo(output) // Do transfomration here
);
session.transfer(REL_SUCCESS, flowFile);
}
如果我是对的,我在那里所做的就是用-在这种情况下,标识内容替换原始FlowFile的内容。
现在有这个方法
session.create(flowFile);
这将创建一个新的FlowFile。
这让我想,无论我做错了什么,都写原始的FlowFile。这样做非常简单,所以感觉还可以。但是,也许最好创建一个新的FlowFile?
两种方式都有什么含义,何时选择一种或另一种是正确的?还是可观察的行为是相同的?
这将是一些代码,它将创建一个新的FlowFile,并写入原始文件的内容:
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) return;
FlowFile newFile = session.create(flowFile);
session.write(
newFile,
output) -> session.read(flowFile).transferTo(output) // Do transfomration here
);
session.transfer(REL_SUCCESS, newFile);
}
这取决于您的要求,但通常应在原始FlowFile中编写。
如果使用创建新的流文件session.create()
,则新的UUID将与该流文件相关联,这将使您的数据源变得一团糟。
在以下情况下制作新的flowFile是值得的:
-您要同时保留原始和新的flowfile-
从外部源(kafka,hdfs,FS ...)读取
-1个输入N输出(例如splitRecords处理器)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句