我在NIFI中有以下流程,JSON中有(1000+)个对象。
invokeHTTP->SPLIT JSON->putMongo
流工作正常,直到我在json中收到带有“。”的键为止。在名字里。例如“ spark.databricks.acl.dfAclsEnabled”。
我当前的解决方案不是最佳解决方案,我记下了错误的键,并使用多个替换文本处理器替换了“”。与“ _”。我不使用REGEX,而是使用字符串文字查找/替换。因此,每当我在putMongo处理器中遇到故障时,我都会插入新的replaceText处理器。
这是无法维持的。我想知道是否可以使用JOLT?关于输入JSON的一些信息。
1)没有固定的结构,只有被确认的东西。一切都会在事件数组中。但是事件对象本身是自由形式。
2)最大列表大小= 1000。
3)第三方JSON,所以我不能要求更改格式。
另外,带有“。”的键可以出现在任何地方。因此,我正在寻找可以在所有级别清除然后重命名的JOLT规范。
{
"events": [
{
"cluster_id": "0717-035521-puny598",
"timestamp": 1531896847915,
"type": "EDITED",
"details": {
"previous_attributes": {
"cluster_name": "Kylo",
"spark_version": "4.1.x-scala2.11",
"spark_conf": {
"spark.databricks.acl.dfAclsEnabled": "true",
"spark.databricks.repl.allowedLanguages": "python,sql"
},
"node_type_id": "Standard_DS3_v2",
"driver_node_type_id": "Standard_DS3_v2",
"autotermination_minutes": 10,
"enable_elastic_disk": true,
"cluster_source": "UI"
},
"attributes": {
"cluster_name": "Kylo",
"spark_version": "4.1.x-scala2.11",
"node_type_id": "Standard_DS3_v2",
"driver_node_type_id": "Standard_DS3_v2",
"autotermination_minutes": 10,
"enable_elastic_disk": true,
"cluster_source": "UI"
},
"previous_cluster_size": {
"autoscale": {
"min_workers": 1,
"max_workers": 8
}
},
"cluster_size": {
"autoscale": {
"min_workers": 1,
"max_workers": 8
}
},
"user": ""
}
},
{
"cluster_id": "0717-035521-puny598",
"timestamp": 1535540053785,
"type": "TERMINATING",
"details": {
"reason": {
"code": "INACTIVITY",
"parameters": {
"inactivity_duration_min": "15"
}
}
}
},
{
"cluster_id": "0717-035521-puny598",
"timestamp": 1535537117300,
"type": "EXPANDED_DISK",
"details": {
"previous_disk_size": 29454626816,
"disk_size": 136828809216,
"free_space": 17151311872,
"instance_id": "6cea5c332af94d7f85aff23e5d8cea37"
}
}
]
}
我创建一个模板使用ReplaceText
,并RouteOnContent
执行此任务。循环是必需的,因为正则表达式仅.
在每次传递时替换JSON密钥中的第一个。您也许可以优化此方法以在一次通过中执行所有替换,但是在将正则表达式与前瞻性组和后视组模糊化几分钟之后,重新路由会更快。我验证了它可以与您提供的JSON一起使用,也可以与JSON和不同行:
上的键和值一起使用(在任一行上):
...
"spark_conf": {
"spark.databricks.acl.dfAclsEnabled":
"true",
"spark.databricks.repl.allowedLanguages"
: "python,sql"
},
...
您还可以使用ExecuteScript
带有Groovy的处理器来提取JSON,快速过滤包含的所有JSON密钥.
,执行collect
操作以进行替换,以及如果您希望单个处理器在JSON数据中重新插入密钥,单通。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句