(logstash)仅对来自kafka输入的Elasticsearch中的特定数据编制索引

赛龙

我正在使用kafka作为输入并将其放入elasticsearch(输出)

input {
    kafka {
        topics =>["maxwell"]
        codec => json
    }
}
filter {
}
output {
    stdout { codec => rubydebug }
    elasticsearch {
        index => 'test_kafka'
        document_type => "%{table}"
        hosts => 'localhost:9200'
    }
}

当它运行时,它输出以下json

{
  "database": "my_db",
  "xid": 88935,
  "@timestamp": "2016-11-14T12:00:13.763Z",
  "data": {
    "contact_country_code": null,
    "contact_type_id": 1,
    "created": "2014-10-03 12:24:36",
    "modified_by": null,
    "modified": "2014-10-03 12:24:36",
    "contact_id": 1,
    "is_default": 0,
    "created_by": null,
    "contact_number": "1241222232"
  },
  "old": {
    "contact_number": "1241222"
  },
  "commit": true,
  "@version": "1",
  "type": "update",
  "table": "contact",
  "ts": 1479124813
}

我的问题是,如何只在Elasticsearch中提取具有动态document_type的数据密钥才能实现这一目标

{
  "_index": "test_kafka",
  "_type": "contact",
  "_id": "AVhitY804rvpX8qdVt9d",
  "_score": 1,
  "_source": {
    "contact_country_code": null,
    "contact_type_id": 1,
    "created": "2014-10-03 12:24:36",
    "modified_by": null,
    "modified": "2014-10-03 12:24:36",
    "contact_id": 1,
    "is_default": 0,
    "created_by": null,
    "contact_number": "1241222232"
  }
}

您可以添加一个ruby过滤器来按摩您的活动,如下所示。首先,将table字段保存在字段中,@metadata以便可以在elasticsearch输出中引用它然后,它将删除除data一个字段以外的所有字段然后,它data在根级别复制该字段内的所有字段,最后删除该data字段。

input {
    kafka {
        topics =>["maxwell"]
        codec => json
    }
}
filter {
  mutate {
     add_field => { "[@metadata][type]" => "%{table}" }
  }
  ruby {
     code => "
        # Ruby code for Logstash 2.x
        event.to_hash.delete_if {|k, v| k != 'data'}
        event.to_hash.update(event['data'].to_hash)
        event.to_hash.delete_if {|k, v| k == 'data'}

        # Ruby code for Logstash 5.x
        event.to_hash.delete_if {|k, v| k != 'data'}            
        event.to_hash.update(event.get('data').to_hash)
        event.to_hash.delete_if {|k, v| k == 'data'}
     "
  }        
}
output {
    stdout { codec => rubydebug }
    elasticsearch {
        hosts => 'localhost:9200'
        index => 'test_kafka'
        document_type => "%{[@metadata][type]}"
    }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

从mongodb仅对ElasticSearch中的某些字段编制索引

Elasticsearch中的索引编制

Logstash编制索引时如何在Elasticsearch中修复重复的文档?

尝试使用Kafka Connect在Elasticsearch中为Kafka主题编制索引

来自Elasticsearch错误的Logstash输入

在Google数据存储区中仅对属性数组中的单个值编制索引(而不是对这些值的每个组合编制索引)

如何将数据存储在elasticsearch _source中而不对其编制索引?

用数据框中的值编制索引

如何在Swift中显示来自JSON的特定数据?

如何仅对Microsoft SQL Server中的特定数据库启用CLR集成?

ng-repeat中的自定义顺序仅对特定数据进行排序

从TreeModel中的选定索引中检索特定数据

如何在Elasticsearch中停止重新索引编制?

如何通过Java API在ElasticSearch中重新编制索引

禁用仅对Elasticsearch上的特定索引创建动态映射?

Logstash浮动字段未编制索引

从yii中的输入表单中选择特定数据

无法在熊猫数据框中按时间戳编制索引

正确的索引编制以在Pandas中创建新的数据框

将来自用户输入的特定数量的字符存储在变量#java中

如何从Logstash索引到Elasticsearch中时对文档进行重复数据删除

如何使ActiveAdmin加载索引的特定数据

Elasticsearch Rails在生产模式下为特定模型重新编制索引

在Elasticsearch中克隆索引和为其重新编制索引有什么区别?

在jquery mobile中动态创建页面,仅包含来自websql数据库的特定数据

使用Logstash将CSV地理数据作为geo_point类型输入到Elasticsearch中

Elasticsearch-什么是索引编制过程?

Python Elasticsearch 7.05索引编制失败

检查Elasticsearch是否已完成索引编制

TOP 榜单

热门标签

归档