使用 Apache NiFi 进行数据操作的 JSON 转换

nilesh1212

我想对下面的示例 JSON 进行一些基本转换,我想将 timeStamp 标记的值更改为日期格式,并想使用 NiFi添加一个created_ts值为 的新标记current_timestamp到我预期的 JSON 输出。

示例 JSON

{"name": "SAMPLE_NAME","timeStamp": "1477307252000","value": "-0.06279052","quality": "1090"}

预期的 JSON:

{"name": "SAMPLE_NAME","timeStamp": "2016-11-08 14:46:13.674","value": "-0.06279052","quality": "1090","created_ts":"2016-11-08 14:46:13.674"}

您能否帮助了解在 Apache NiFi/HDF 中要遵循的详细步骤。

达吉特

未实施数据转换。

查看官方文档:

https://github.com/bazaarvoice/jolt#stock-transforms

股票转换

股票转换是:

shift       : copy data from the input tree and put it the output tree
default     : apply default values to the tree
remove      : remove data from the tree
sort        : sort the Map key values alphabetically ( for debugging and human readability )
cardinality : "fix" the cardinality of input data.  Eg, the "urls" element is usually a List, 
                    but if there is only one, then it is a String

目前,所有 Stock 转换只影响数据的“结构”。

要进行数据操作,您需要编写 Java 代码。

如果您编写 Java“数据操作”代码来实现 Transform 接口,那么您可以将代码插入到转换链中。


因此,为了完成您的任务,我看到了两个主要变体:


V1:

使用以下处理器的序列:

EvaluateJsonPath -> UpdateAttributes -> AttributesToJSON

EvaluateJsonPath定义每个字段属性的表达式中,例如$.name, $.timeStamp, ...

UpdateAttributes转换的格式timeStamp和定义新的属性:

attribute  |   value/expression
-----------------------------------------------------------
timeStamp  |   timeStamp:format('yyyy-MM-dd HH:mm:ss.SSS')
created_ts |   now():format('yyyy-MM-dd HH:mm:ss.SSS')

AttributesToJSON定义Attributes List作为json对象存储到文件内容中


V2:使用ExecuteScript带有以下代码的处理器:

import groovy.json.JsonSlurper
import groovy.json.JsonBuilder

def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
    // transform streams into reader and writer
    rawIn.withReader("UTF-8"){reader->
        rawOut.withWriter("UTF-8"){writer->
            //parse reader into Map
            def json = new JsonSlurper().parse(reader)
            //change/set values
            json.timeStamp = new Date(json.timeStamp as Long).format('yyyy-MM-dd HH:mm:ss.SSS')
            json.created_ts = new Date().format('yyyy-MM-dd HH:mm:ss.SSS')
            //write changed object to writer
            new JsonBuilder(json).writeTo(writer)
        }
    }
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章