elk

wuer888

Logstash日志收集(三)

还是得先顺着官网了解一波:https://www.elastic.co/products/logstash

一、跟着官网学习下Logstash的基本概念

 集中,转换和隐藏。您的数据Logstash是一个开源的服务器端数据处理管道,可以同时从多个源中获取数据,并将其转换为您喜欢的“存储”(自然是Elasticsearch)。)

1.1 Logstash 6.0.0新增功能

用多条管道简化处理:

Bash
 Logstash 6.0引入了针对不同用例同时运行多个管道的能力。 管道在同一个实例中一起运行,但具有独立的输入,过滤器和输出,使用户能够隔离每个数据源的处理逻辑。 这使您的管道逻辑集中和简洁。 
 从历史上看,许多Logstash用户将多个用例组合成单个管道,这需要添加复杂的条件逻辑。 有了多个不再需要的管道, 现在,您可以更干净地组织您的配置,并通过使用每个用例的专用管道来更有效地执行您的管道。
 为了增加趣味性,每个管道还有自己的独立设置和生命周期,您可以调整它们以匹配该数据源所需的相应工作负载配置文件。 例如,您可能希望为高容量日志记录管道分配更多的管道工作线程,并为较低强度的本地度量标准管道节流资源。

多管道的官方文档:https://www.elastic.co/guide/en/logstash/6.0/multiple-pipelines.html

多管道的博客地址:https://www.elastic.co/blog/logstash-multiple-pipelines

用Elastic Stack集中管理管道:

Bash
在过去,管理pipeline配置或者是手动任务,或者使用像Puppet或Chef这样的配置管理工具来协助操作自动化。 在Logstash 6.0,集中管道管理功能现在使您能够通过Kibana单一窗格直接使用Elastic Stack管理和自动编排Logstash部署。
此功能为Kibana带来了管道管理UI,您可以使用它来创建,编辑和删除pipeline。 在这个UI下面,我们使用Elasticsearch来存储你的pipelines配置。 通过几个简单的设置,您的Logstash节点可以配置为监视这些管道上的变化,让您可以无缝地推出管道更改,而无需额外的运营基础架构。

#集中管道管理可作为X-Pack功能使用。

#文档连接:https://www.elastic.co/guide/en/logstash/6.0/logstash-centralized-pipeline-management.html

可视化管道逻辑和性能:

Bash
Pipeline Viewer,这是对Logstash Monitoring UI的一个补充。 有了这个新工具,您可以看到您的管道配置,并排除插件级别的性能瓶颈。 管道查看器将Logstash管道显示为DAG(定向非循环图)。 在这个DAG上重叠是个别输入,过滤器和输出的相关性能指标。 突出显示潜在的性能问题,以便快速确定管道的哪些部分可能是瓶颈。
用户应该注意,这是一个测试版功能,可能会有所变化。 一个已知的问题是缺乏清晰渲染非常大的管道的能力。 这是一个积极的工作领域,所以当我们进一步改进并将其转化为全面可用性时,期待对此功能进行重大改进。

从导入节点到Logstash的Smooth path:

Bash
一些用户在开始使用Elastic Stack时享受Elasticsearch摄取节点的便利。 但是,在一定程度的复杂性中,Ingest节点可能不具备解决您的问题所需的所有功能,并且可能需要迁移到Logstash。 为了简化这些转换,我们创建了一个随Logstash 6.0一起提供的Ingest管道转换工具。 转换器将一个接收节点管道作为输入,并将相应的Logstash管道 spits out。

使用新的JRuby 9k:

Bash
我们花了很多时间把Logstash项目转移到JRuby的最新版本:JRuby 9000,它支持现代的Ruby语法和增强的内部结构,更适合优化。 对于插件开发者来说,这意味着你的老插件将继续在除了最罕见的情况之外的所有工作。 此升级还意味着您可以继续使用新的Ruby功能,以及使用仅与JRuby 9k兼容的Ruby库。

1.2 跟着官网了解下logstash的基本流程

INPUTS

提取所有形式、大小、来源的数据:

       数据通常以多种格式散布或分布在许多系统中。 Logstash支持各种输入,可以同时从多个常见的源中获取事件。 轻松从日志,指标,Web应用程序,数据存储和各种AWS服务中进行采集,所有这些都以连续,流式的方式进行。

图片.png

FILTERS

即时分析和转换您的数据:

     当数据从源传输到存储时,Logstash过滤器解析每一个事件,识别命名的字段来构建结构,并将它们转换为一种通用格式,以方便、加速的分析和业务价值。

     Logstash动态转换和准备您的数据,无论其格式或复杂性如何:

Bash
通过grok从非结构化数据派生结构
从IP地址解读地理坐标
Anonymize PII数据,完全排除敏感字段
简化独立于数据源,格式或模式的整体处理

过滤器库:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

图片.png

OUTPUTS

选择你的Stash,传输你的数据:

      虽然Elasticsearch是我们的首选输出,开创了搜索和分析可能性的世界,但并不是唯一可用的。Logstash有多种输出,可以让您在需要的地方路由数据,从而可以灵活地释放大量的下游用例。

输出插件:https://www.elastic.co/guide/en/logstash/current/output-plugins.html

#大概流程就是流入==》过滤==》流出,下面还有点其他介绍。

可扩展性:

创建和配置自己的Pipeline:

         Logstash有一个可插拔的框架,拥有超过200个插件。 混合,匹配和编排不同的输入,滤波器和输出以实现在管道中和谐工作。

         Logstash plugins创建很容器构建,为插件开发和插件生成器通过了API,可以从自定义应用程序中获取数据(https://www.elastic.co/guide/en/logstash/current/contributing-to-logstash.html)。

耐用性和安全性:

内置数据传输的信任管道:

        如果Logstash节点发生故障,Logstash会保证至少一次为您的进行中的事件传递其持续队列。未成功处理的事件可以被分流到一个死信队列中进行自省和回放。通过吸收吞吐量的能力,Logstash通过吞吐峰值进行扩展,而无需使用外部排队层。

        无论您是运行10或1000个Logstash实例,我们都可以让您充分保护您的采集管道。 来自Beats(https://www.elastic.co/products/beats)的输入数据以及其他输入可以通过网络加密,并且与安全的Elasticsearch集群(https://www.elastic.co/products/x-pack/security)完全集成。

监测:

对部署具有完全可见性:

        Logstash管道经常是多功能的,并且可以变得复杂,使得对管道性能,可用性和瓶颈的深入理解是非常宝贵的。 借助X-Pack中的监视和管道查看器功能(https://www.elastic.co/products/x-pack/monitoring),您可以轻松观察和研究活动Logstash节点或完整部署。

图片.png

管理和编排:

使用单个UI集中管理部署:

      使用Pipeline Management UI来执行Logstash部署,这使得编排和管理管道变得轻而易举。 管理控制还可以与内置的X-Pack安全功能无缝集成,以防止任何意外重新连接。

图片.png

博文来自:www.51niux.com

二、Logstash的安装部署

#这里先用一台做下日志的收集工作。我们就选择192.168.14.65这台机器作为Logstash的机器。

#还是先要贴一下官网部署以及使用文档:https://www.elastic.co/guide/en/logstash/current/index.html

#Logstash和filebeat的几个示例:https://www.elastic.co/guide/en/logstash/current/logstash-config-for-filebeat-modules.html

#filebeat的正则:https://www.elastic.co/guide/en/beats/filebeat/6.0/regexp-support.html

2.1 安装Logstash

安装java

#Logstash需要Java 8。不支持Java 9。

# java -version

Bash
java version "1.8.0_74"
Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.74-b02, mixed mode)

#在某些Linux系统上,在尝试安装之前,可能还需要导出JAVA_HOME环境,特别是如果您从tarball安装了Java。 这是因为Logstash在安装过程中使用Java来自动检测您的环境并安装正确的启动方法(SysV init脚本,Upstart或systemd)。 如果Logstash在软件包安装期间无法找到JAVA_HOME环境变量,则可能会收到错误消息,并且Logstash将无法正常启动。

下载安装Logstash

# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.0.1.tar.gz     #线上用的是2.3.4的版本,这里用最新版......

# tar zxf logstash-6.0.1.tar.gz

# useradd elk

# cp -r logstash-6.0.1 /home/elk/
# ln -s /home/elk/logstash-6.0.1 /home/elk/logstash
# chown -R elk:elk /home/elk/

测试Logstash

#首先,我们通过运行最基本的Logstash管道来测试Logstash安装。Logstash管道有两个必需的元素,即输入和输出,以及一个可选元素filter。 输入插件使用来自数据源的数据,过滤器插件根据您的指定修改数据,输出插件将数据写入目标。

图片.png

#要测试Logstash安装,请运行最基本的Logstash管道。 例如:

# su - elk

$ ./logstash -e \'input { stdin { } } output { stdout {} }\'   #定义了一个叫“stdin”的input还有一个“stdout”的output,无论输入什么字符,Logstash都会按照结构化格式将输入输出。

Bash
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:108: warning: already initialized constant DEFAULT_MAX_POOL_SIZE
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:110: warning: already initialized constant DEFAULT_REQUEST_TIMEOUT
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:111: warning: already initialized constant DEFAULT_SOCKET_TIMEOUT
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:112: warning: already initialized constant DEFAULT_CONNECT_TIMEOUT
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:113: warning: already initialized constant DEFAULT_MAX_REDIRECTS
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:114: warning: already initialized constant DEFAULT_EXPECT_CONTINUE
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:115: warning: already initialized constant DEFAULT_STALE_CHECK
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:590: warning: already initialized constant ISO_8859_1
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:641: warning: already initialized constant KEY_EXTRACTION_REGEXP
Sending Logstash\'s logs to /home/elk/logstash/logs which is now configured via log4j2.properties
[2017-12-07T17:00:51,114][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/home/elk/logstash/modules/fb_apache/configuration"}
[2017-12-07T17:00:51,118][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/home/elk/logstash/modules/netflow/configuration"}
[2017-12-07T17:00:51,346][WARN ][logstash.config.source.multilocal] Ignoring the \'pipelines.yml\' file because modules or command line options are specified
[2017-12-07T17:00:51,561][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-12-07T17:00:51,924][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000, :thread=>"#<Thread:0x7c079096@/home/elk/logstash/logstash-core/lib/logstash/pipeline.rb:290 run>"}
[2017-12-07T17:00:51,957][INFO ][logstash.pipeline        ] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2017-12-07T17:00:51,971][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
hello world    #手工输入一个hello world
2017-12-07T09:03:32.065Z localhost.localdomain hello world   #可以返回一个hello world,测试成功了。
[2017-12-07T17:07:14,815][INFO ][logstash.pipeline        ] Pipeline terminated {"pipeline.id"=>"main"}   #CTRL+D退出

#Logstash将时间戳和IP地址信息添加到消息中。 通过在运行Logstash的shell中发出CTRL-D命令来退出Logstash。

博文来自:www.51niux.com

三、解析日志与Logstash

3.1 配置Filebeat发送日志到Logstash

 先跟着官网介绍一下:       

        在创建Logstash管道之前,将配置Filebeat以将日志行发送到Logstash。Filebeat客户端是一个轻量级的,资源友好的工具,它从服务器上的文件中收集日志,并将这些日志转发给Logstash实例进行处理。 Filebeat专为可靠性和低延迟而设计。 Filebeat在主机上占用的资源较少,Beats输入插件最大限度地减少了Logstash实例的资源需求。        

       默认的Logstash安装包括Beats输入插件。 Beats输入插件允许Logstash从Elastic Beats框架接收事件,这意味着任何Beat框架编写的Beat框架(如Packetbeat和Metricbeat)都可以将事件数据发送到Logstash。

客户端安装Filebeat:

       #正常生产场景下,哪台机器需要收集日志如web日志等会在其机器上安装Filebeat客户端。这里搞一个192.168.14.67来做此次web日志测试的客户机。

# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.0.1-linux-x86_64.tar.gz   #还是下载最新版,线上用的是5.1.1版的

# useradd elk 
# tar zxf filebeat-6.0.1-linux-x86_64.tar.gz 
# cp -r filebeat-6.0.1-linux-x86_64 /home/elk/filebeat-6.0.1
# ln -s /home/elk/filebeat-6.0.1 /home/elk/filebeat
# chown -R elk:elk /home/elk/filebeat-6.0.1

配置Filebeat:

#su - elk

$ vim /home/elk/filebeat/filebeat.yml   #只粘贴修改部分

Bash
#=========================== Filebeat prospectors =============================
- type: log
  paths:
    #- /var/log/*.log    #目前按照Go语言的glob函数处理。没有对配置目录做递归处理,比如配置的如果是:/var/log/* /*.log则只会去/var/log目录的所有子目录中寻找以”.log”结尾的文件,而不会寻找/var/log目录下以”.log”结尾的文件。
    #- c:\programdata\elasticsearch\logs\*
    - /usr/local/nginx/logs/access.log
  tags: ["nginx-access","test.51niux.com"]
#================================ General =====================================
name: "192.168.14.67"
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
#  hosts: ["localhost:9200"]
#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["192.168.14.65:5066"]
  loadbalance: true
  compression_level: 6

$ cat /home/elk/filebeat/filebeat.yml    #把样例配置都贴出来

Bash
#=========================== Filebeat prospectors =============================
filebeat.prospectors:
#每个 - 是一个prospectors。大多数选项可以在prospectors级别设置,因此您可以使用不同的prospectors进行各种配置。 以下是prospectors特定的配置。
- type: log
  enabled: false  ##更改为true以启用此prospectors配置。
  paths:     ##应该被抓取和抓取的路径。 基于Glob的路径。
    - /var/log/*.log
    #- c:\programdata\elasticsearch\logs\*
  #exclude_lines: [\'^DBG\']    #排除行。 要匹配的正则表达式列表。 它删除与列表中任何正则表达式匹配的行。
  #include_lines: [\'^ERR\', \'^WARN\']   #包括行。 要匹配的正则表达式列表。 它从列表中导出与任何正则表达式匹配的行。
  #exclude_files: [\'.gz$\']   #排除文件。 要匹配的正则表达式列表。 Filebeat会从列表中删除与任何正则表达式匹配的文件。 默认情况下,没有文件被丢弃。
  #fields:   #可选的附加字段。 可以自由选择这些字段,以将其他信息添加到搜寻的日志文件中进行过滤
  #  level: debug
  #  review: 1
  
  ###多行选项 
  #Multiline可以用于跨越多行的日志消息。 这在Java Stack Traces或C-Line Continuation中很常见   
  #multiline.pattern: ^\[    #必须匹配的正则表达式模式。 示例模式匹配所有以[
  #multiline.negate: false   #定义模式下的模式是否应该被否定。 默认是false。
  #multiline.match: after    ##匹配可以设置为“after”或“before”。 它用于定义行是否应该附加到之前或之后(不匹配)的模式,或者只要模式不匹配,则基于否定。 注意:相当于之前和之前相当于Logstash中的下一个
  
#============================= Filebeat modules ===============================
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml   #配置加载的全局模式
  reload.enabled: false    #设置为true以启用配置重新加载
  #reload.period: 10s      #应检查路径下文件的更改时间间隔
#====================  Elasticsearch template setting ==========================  
setup.template.settings:
  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false
#================================ General =====================================
#name: #发布网络数据的发送人的名称。 它可以用于将单个发货人在Web界面中发送的所有传输进行分组。   
#tags: ["service-X", "web-tier"]  #发送人的标签包含在他们自己的字段中,每次传输都会发布。
#fields:    ##可选字段,可以指定将其他信息添加到输出。
#  env: staging

#============================== Dashboards =====================================     
#setup.dashboards.enabled: false   #这些设置控制将示例仪表板加载到Kibana索引。 加载仪表板默认是禁用的,可以通过在这里设置选项来启用,或者使用`-setup` CLI标记或`setup`命令启用。
#setup.dashboards.url:   #从哪里下载仪表板归档的URL。 默认情况下,这个URL的值是根据节拍名称和版本计算的。 对于发布的版本,此URL指向artifacts.elastic.co网站上的仪表板存档。

#============================== Kibana =====================================
#从Beats版本6.0.0开始,仪表板通过Kibana API加载。这需要一个Kibana端点配置。
setup.kibana:
   #host: "localhost:5601"
#============================= Elastic Cloud ==================================
#这些设置简化了使用Elastic Cloud(https://cloud.elastic.co/)的filebeat。 
#cloud.id:    #cloud.id设置将覆盖`output.elasticsearch.hosts`和`setup.kibana.host`选项。 可以在Elastic Cloud Web UI中找到
#cloud.auth:  #cloud.auth设置将覆盖`output.elasticsearch.username`和`output.elasticsearch.password`设置。 格式是<user>:<pass>`。
#================================ Outputs =====================================
#配置发送beat收集的数据时使用的输出。
#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch: 
  hosts: ["localhost:9200"]   #要连接的主机数组。 
  #protocol: "https"    ##可选协议和基本认证凭证。
  #username: "elastic"
  #password: "changeme"
#----------------------------- Logstash output --------------------------------
#output.logstash:
  #hosts: ["localhost:5044"]  #Logstash主机
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]   #可选的SSL。默认是关闭的。HTTPS服务器验证的根证书列表
  #ssl.certificate: "/etc/pki/client/cert.pem"   #SSL客户端认证证书
  #ssl.key: "/etc/pki/client/cert.key"   #客户证书密钥
#================================ Logging =====================================
#logging.level: debug   ##设置日志级别。 默认的日志级别是info。可用的日志级别是:critical, error, warning, info, debug
#logging.selectors: ["*"]  ##在调试级别,您可以选择性地启用仅针对某些组件的日志记录。 要启用所有选择器,请使用[“*”]。 其他选择器的例子是“beat”,“publish”,“service”。

 启动Filebeat:

$ ./filebeat -e -c filebeat.yml -d "publish"

Bash
2017/12/07 11:12:04.686924 beat.go:426: INFO Home path: [/home/elk/filebeat] Config path: [/home/elk/filebeat] Data path: [/home/elk/filebeat/data] Logs path: [/home/elk/filebeat/logs]
2017/12/07 11:12:04.687210 metrics.go:23: INFO Metrics logging every 30s
2017/12/07 11:12:04.687390 beat.go:433: INFO Beat UUID: d85b9ebe-c3c4-4ae3-91ed-0826a7bb8382
2017/12/07 11:12:04.687448 beat.go:192: INFO Setup Beat: filebeat; Version: 6.0.1
2017/12/07 11:12:04.689131 logger.go:18: DBG start pipeline event consumer
2017/12/07 11:12:04.689185 module.go:80: INFO Beat name: 192.168.14.67
2017/12/07 11:12:04.709937 beat.go:260: INFO filebeat start running.
2017/12/07 11:12:04.710061 registrar.go:71: INFO No registry file found under: /home/elk/filebeat/data/registry. Creating a new registry file.
2017/12/07 11:12:04.733832 registrar.go:108: INFO Loading registrar data from /home/elk/filebeat/data/registry
2017/12/07 11:12:04.733922 registrar.go:119: INFO States Loaded from registrar: 0
2017/12/07 11:12:04.735009 filebeat.go:260: WARN Filebeat is unable to load the Ingest Node pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the Ingest Node pipelines or are using Logstash pipelines, you can ignore this warning.
2017/12/07 11:12:04.735072 crawler.go:44: INFO Loading Prospectors: 1
2017/12/07 11:12:04.736376 registrar.go:150: INFO Starting Registrar
2017/12/07 11:12:04.737264 prospector.go:103: INFO Starting prospector of type: log; id: 16430255320212846879 
2017/12/07 11:12:04.737538 crawler.go:78: INFO Loading and starting Prospectors completed. Enabled prospectors: 1
2017/12/07 11:12:04.737692 reload.go:128: INFO Config reloader started
2017/12/07 11:12:04.750387 reload.go:220: INFO Loading of config files completed.
2017/12/07 11:12:04.752227 harvester.go:207: INFO Harvester started for file: /usr/local/nginx/logs/access.log
2017/12/07 11:12:34.689285 metrics.go:39: INFO Non-zero metrics in the last 30s: beat.memstats.gc_next=4473924 beat.memstats.memory_alloc=3062112 beat.memstats.memory_total=3062112 filebeat.events.added=1 filebeat.events.done=1 filebeat.harvester.open_files=1 filebeat.harvester.running=1 filebeat.harvester.started=1 libbeat.config.module.running=0 libbeat.config.reloads=1 libbeat.output.type=logstash libbeat.pipeline.clients=1 libbeat.pipeline.events.active=0 libbeat.pipeline.events.filtered=1 libbeat.pipeline.events.total=1 registrar.states.current=1 registrar.states.update=1 registrar.writes=2
2017/12/07 11:13:04.688099 metrics.go:39: INFO Non-zero metrics in the last 30s: beat.memstats.gc_next=4473924 beat.memstats.memory_alloc=3088000 beat.memstats.memory_total=3088000 filebeat.harvester.open_files=1 filebeat.harvester.running=1 libbeat.config.module.running=0 libbeat.pipeline.clients=1 libbeat.pipeline.events.active=0 registrar.states.current=1
2017/12/07 11:13:34.687919 metrics.go:39: INFO Non-zero metrics in the last 30s: beat.memstats.gc_next=4473924 beat.memstats.memory_alloc=3107840 beat.memstats.memory_total=3107840 filebeat.harvester.open_files=1 filebeat.harvester.running=1 libbeat.config.module.running=0 libbeat.pipeline.clients=1 libbeat.pipeline.events.active=0 registrar.states.current=1

3.2Logstash端配置

$ mkdir conf

$ cd conf

$mkdir /home/elk/logstash/patterns   #我们要转换的格式文件就存放在此目录下

$ cat /home/elk/logstash/patterns/51niux 

Bash
NGUSERNAME [a-zA-Z\.\@\-\+_%]+    #这些现在还用不到,以后就会用到先粘贴在这里了
NGUSER %{NGUSERNAME}
NOTQUOTE [^\"]*
SPACE \s*
NOTDOT [^,]*

TEST_HTTP_ACCESS %{HTTPDATE:timestamp}\|%{IP:remote_addr}\|%{IPORHOST:http_host}\|(?:%{DATA:http_x_forwarded_for}|-)\|%{DATA:request_method}\|%{DATA:request_uri}\|%{DATA:server_protocol}\|%{NUMBER:status}\|(?:%{NUMBER:body_bytes_sent}|-)\|(?:%{DATA:http_referer}|-)\|%{DATA:http_user_agent}\|(?:%{DATA:request_time}|-)\|"
#主要是这里这句话,定义了一个TEST_HTTP_ACCESS,这就相当于模板,后面是通过log_format来匹配对应的正则。message是每段读进来的日志,IPORHOST、USERNAME、HTTPDATE等都是patterns/grok-patterns中定义好的正则格式名称,对照日志进行编写。
#grok pattren的语法为:%{SYNTAX:semantic},":" 前面是grok-pattrens中定义的变量,后面可以自定义变量的名称。(?:%{SYNTAX:semantic}|-)这种形式是条件判断。如果有双引号""或者中括号[],需要加 \ 进行转义。

$ cat /home/elk/logstash/conf/logstash-nginx.conf  #定义被logstash引用的配置文件

Bash
input {   #定义客户端发送到哪个IP的哪个端口
    beats {
        host => "192.168.14.65"    
        port => 5066
    }
}
filter {
    if "nginx-access" in [tags]{    #如果tags里面是nginx-access匹配到了就进行下面的判断,if判断是用在收集多业务nginx日志的时候,这里先把框架写在这里,然后如果不是的话就可以在下面再定义else if
        if "test.51niux.com" in [tags]{    #如果tags匹配到是test.51niux.com的话
            grok {   #就进行我们指定的TEST_HTTP_ACCESS这个模板里面的格式进行grok的格式转换。
                 patterns_dir => "/home/elk/logstash/patterns"   
                 match => {
                          "message" => "%{TEST_HTTP_ACCESS}"
                 }
            }
            mutate {   #类型转换,可以转换类型包括:"integer","float" 和 "string"。示例如下:
                convert => { "nginx_request_time" => "float"}
                convert => { "nginx_body_bytes_sent" => "integer"}
            }
   }
    
  }
}
output {
    if "nginx-access" in [tags]{   #如果是带着nginx-access的tags的消息
        if "test.51niux.com" in [tags]{   #如果是带着tags是test.51niux.com的消息
            elasticsearch {   #写入到elasticsearch中
                hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"]   #这里是elasticsearch集群
                index => "logstash-nginx-access-test-51niux-com-%{+YYYY.MM}"   #这里是创建的索引的格式以logstash开头然后以年月结尾。
            }         
    }
 }
}

$ /home/elk/logstash/bin/logstash -t -f /home/elk/logstash/conf/logstash-nginx.conf   #上面文件if判断了{}有点多,可以用-t来验证一下配置文件而不启动,结尾出现下面就是可以了

Bash
Configuration OK
[2017-12-07T19:38:07,082][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

$ /home/elk/logstash/bin/logstash  -f /home/elk/logstash/conf/logstash-nginx.conf   #指定配置文件前台启动,我们先看看输出。

Bash
[2017-12-07T19:52:04,304][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://192.168.14.60:9200/, http://192.168.14.61:9200/, http://192.168.14.62:9200/, http://192.168.14.63:9200/, http://192.168.14.64:9200/]}}
[2017-12-07T19:52:04,307][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.14.60:9200/, :path=>"/"}
[2017-12-07T19:52:04,401][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://192.168.14.60:9200/"}
[2017-12-07T19:52:04,438][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>nil}
[2017-12-07T19:52:04,438][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won\'t be used to determine the document _type {:es_version=>6}
[2017-12-07T19:52:04,439][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.14.61:9200/, :path=>"/"}
[2017-12-07T19:52:04,449][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://192.168.14.61:9200/"}
[2017-12-07T19:52:04,455][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.14.62:9200/, :path=>"/"}
[2017-12-07T19:52:04,464][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://192.168.14.62:9200/"}
[2017-12-07T19:52:04,475][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.14.63:9200/, :path=>"/"}
[2017-12-07T19:52:04,486][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://192.168.14.63:9200/"}
[2017-12-07T19:52:04,494][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.14.64:9200/, :path=>"/"}
[2017-12-07T19:52:04,502][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://192.168.14.64:9200/"}
[2017-12-07T19:52:04,514][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-12-07T19:52:04,518][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=

分类:

技术点:

相关文章: