聚合日志将第一个时间戳设置为@timestamp

亚历克斯·泽尔

我正在为“ 389 Directory Server”日志创建过滤器,这些日志显示了服务器上用户操作,连接,搜索,添加,修改等的历史记录。我正在使用聚合过滤器来合并所有此日志行变成一个事件。

但是,我希望事件的最终@timestamp(在用户断开连接后)成为第一个事件的@timestamp(首次建立连接时),我尝试使用日期过滤器,尽管它确实改变了每个事件的@timestamp(每条日志行),聚合筛选器生成的最终映射仍然使用处理日志的时间。

我可以将第一个@timestamp保存到地图中的另一个文件中,但是如何用该字段替换@timestamp?

由于过滤器很长,我将仅包括开始和结束:

filter {
  grok {
    match => { "message" => [
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} fd=%{NUMBER:file_descriptor} slot=%{NUMBER} %{WORD:connection_method} connection from %{IP:source} to %{IP:destination}$",
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} %{NOTSPACE:ssl_version} (?<encryption_method>%{NOTSPACE} %{NOTSPACE})$",
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} dn=%{QUOTEDSTRING:user_dn} method=%{NOTSPACE:bind_method} version=%{NUMBER:ldap_version}($)?(mech=%{NOTSPACE:auth_mechanism}$)?",
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} err=%{NUMBER:error_code} tag=%{NUMBER:tag_number} nentries=%{NUMBER:number_of_entries} etime=%{NUMBER:operation_time}($)?(dn=%{QUOTEDSTRING}$)?",
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} base=%{QUOTEDSTRING:search_base} scope=%{NUMBER:search_scope} filter=%{QUOTEDSTRING:search_filter} attrs=%{QUOTEDSTRING:search_attributes}$",
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation}$",
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} fd=%{NUMBER:file_descriptor} %{WORD:connection_result} - %{WORD:connection_code}$"
      ]
    }
  }
  if "" in [connection_method] {
    aggregate {
      task_id => "%{connection_id}"
      code => "
        map['timestamp'] = event['@timestamp']
        map['tags'] ||= ['aggregated']
        map['source'] = event['source']
        map['destination'] = event['destination']
        map['file_descriptor'] = event['file_descriptor']
        map['connection_method'] = event['connection_method']
      "
      map_action => "create"
    }
  }
  else if "" in [connection_code] {
    mutate {
      add_tag => [ "map_finished" ]
    }
    aggregate {
      task_id => "%{connection_id}"
      code => "
        map['operations'][event['op_number']]['connection_code'] = event['connection_code']
        map['operations'][event['op_number']]['connection_result'] = event['connection_result']
      "
      map_action => "update"
    }
  }
  else {
    aggregate {
      task_id => "%{connection_id}"
      code => "
        map['@timestamp'] = map['timestamp']
      "
      timeout => 0
      push_map_as_event_on_timeout => true
    }
  }
}
亚历克斯·泽尔

弄清楚了,我没有意识到/理解的是,当我到达最后一个事件并推出地图时,logstash将把它(地图)作为新事件处理(例如文件中的新日志行)并且logstash将尝试将该事件与过滤器之一进行匹配,但由于最终地图不包含与任何过滤器都匹配的按摩字段,因此Logsash将失败。

创建一个新的过滤器可以解决该问题。

filter {
  if "aggregated" in [tags] {
    date {
      match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
      target => "@timestamp"
      add_tag => [ "tmatch" ]
    }
  }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

按某一列将数据帧压缩为包含第一个和最后一个时间戳以及值均值的行

Spark结构化流分组窗口-我希望第一个时间间隔从第一个时间戳开始

从表中删除重复的行,除了每天的第一个时间戳

正则表达式仅捕获每行的第一个时间戳

按顺序返回每个ID的第一个和最后一个时间戳,并可能包含重复值和缺失值

根据第一个时间戳记录保留列表中的值

使用foreach为每组记录指定一个时间戳

为每个字段选择最后一个时间戳值的字段

选择每个日期的最后一个时间戳

自行加入下一个时间戳

如何获得第一个和最后一个时间顺序之间的差异?

如何将先前的时间戳预测作为下一个时间戳的附加输入?

当新的时间戳值等于前一个时间戳值时,DataStax / Cassandra的USING TIMESTAMP行为是不可预测的

在pyspark中的两个时间戳之间创建一个时间戳数组

将时间范围列表中的所有项目计数为一个时间范围

如何使用列和聚合函数的映射为 agg 中的第一个函数设置 ignoreNulls 标志?

在另一个时间戳之前计算时间戳数量的更快方法

将groupby的第一个值设置为Nan

Pandoc:将文档标题设置为第一个标题

JavaScript无法将焦点设置为ul中的第一个li元素

将第一个值设置为Javascript数组中的键

Angular 5-Observable-将Observable的第一个值设置为表单组

无法将CurrentCell焦点设置为DataGridView中的第一个可编辑列

Rails选择选项-将第一个选项设置为禁用,选中

如何将DOM元素设置为第一个孩子?

将Recyclerview的仅第一个位置设置为标题布局

golang反射将结构的第一个值设置为其零值

将样式设置为仅使用* ngFor创建的第一个元素

将控制器属性设置为第一个模型

TOP 榜单

热门标签

归档