Logstash配置包括input,output,filter。
输入input可以指定四种方式,file,syslog,beat,redis。前面使用filebeat发送event事件,所以配置了beat方式。
输出output包括elasticsearch,file,graphite,statsd四种,最常用的是elasticsearch方式。
过滤filter可以对数据进行预处理,grok,drop,clone,geoip等。使用grok可以把日志解析成字段,使用geoip可以处理地理位置和IP。
https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
grok配置
https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
match => { "message" => "%{COMBINEDAPACHELOG}"}
其实匹配了日志中一行记录,放在message字段里,格式就是%{COMBINEDAPACHELOG}。
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
这种匹配了55.3.244.1 GET /index.html 15824 0.043这样的日志。
解析后:
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
比如:
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<[email protected]>
这行日志可以配置匹配方式:
match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
解析成的格式:
timestamp: Jan 1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
syslog_message: message-id=<[email protected]>
通过解析过滤后,原来日志文件中的一行行文本解析成一行行的字段记录,在Kibana中就可以直观地看到字段的值,同时在ElasticSearch中也能按照字段进行查找。
使用Dev Tool请求:
GET /logstash-2015.05.18/_search?q=geo.src:FR
{
"query": {
"match_all": {}
}
}
至此,日志可以通过ELK收集和查询了。