【问题标题】:Logstash conditionnal in message消息中的 Logstash 条件
【发布时间】:2019-01-30 08:10:17
【问题描述】:

我正在使用 ELK docker stack 来聚合和分析来自不同来源的日志,但我的 logstash 配置有问题。

Filebeat 将流重定向到 logstash,而我在 Elasticsearch 上什么也没有,所以我认为 logstash 配置存在问题。

我的 docker 日志中有两种不同类型的日志:

  1. HTTP 请求日志

    2019-01-29T18:35:15.423Z HTTP INFO "POST /myroute/?param1=test" 201 41 - 44.014 毫秒

  2. APP日志

    2019-01-29T18:48:19.657Z APP 错误:{"code":201,"message":"ok"}

我想检查日志并在检测到它是“APP”还是“HTTP”时对其进行变异。所以,这里是我的logstash配置

input {
  beats {
    port => 5044
    codec => "json"
  }
}
filter {
    if "HTTP" in [message] {
      grok {
          mapping => { "message" => %{TIMESTAMP_ISO8601:timestamp} %{WORD:type} %{LOGLEVEL:level} "%{WORD:method} %{URIPATHPARAM:url}" %{INT:code} %{INT:bytes} - %{GREEDYDATA:response_time}
      }
    }
    else if "APP" in [message] {
      grok {
          mapping => { "message" => %{TIMESTAMP_ISO8601:timestamp} %{WORD:type} %{LOGLEVEL:level} %{GREEDYDATA:jsonstring}  }
      }
      json {
            source => "jsonstring"
            target => "doc"
      }
      mutate {
        add_field => {
          "code" => "%{[doc][code]}"
          "message" => "%{[doc][message]}"
        }
      }
    }
  }
}
output { 
    elasticsearch { 
        hosts => ["localhost"] 
    } 
}

我认为当我尝试检查邮件内容时出现问题,但我不知道如何解决。任何的想法 ?

非常感谢!

编辑:

我在我的配置中修复了一些问题,但它仍然不起作用

input {
  beats {
    port => 5044
    codec => "json"
  }
}
filter {
    if [message] =~ /HTTP/  {
      grok {
          mapping => { "message" => %{TIMESTAMP_ISO8601:timestamp} %{WORD:type} %{LOGLEVEL:level} "%{WORD:method} %{URIPATHPARAM:url}" %{INT:code} %{INT:bytes} - %{GREEDYDATA:response_time}
      }
    }
    else if [message] =~ /APP/ {
      grok {
          mapping => { "message" => %{TIMESTAMP_ISO8601:timestamp} %{WORD:type} %{LOGLEVEL:level} %{GREEDYDATA:jsonstring}  }
      }
      json {
            source => "jsonstring"
            target => "doc"
      }
      mutate {
        add_field => {
          "code" => "%{[doc][code]}"
          "message" => "%{[doc][message]}"
        }
      }
    }
  }
}
output { 
    elasticsearch { 
        hosts => ["localhost"] 
    } 
}

编辑 2:

logstash.stdout 日志

 Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, \", ', -, [, { at line 10, column 37 (byte 149) after filter {\n    if \"HTTP\" in [message] {\n      grok {\n          mapping => { \"message\" => ", :backtrace=>["/opt/logstash/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2486:in `map'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:149:in `initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:22:in `initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:90:in `initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:42:in `block in execute'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:92:in `block in exclusive'", "org/jruby/ext/thread/Mutex.java:148:in `synchronize'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:92:in `exclusive'", "/opt/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:38:in `execute'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:317:in `block in converge_state'"

没有人? :(

【问题讨论】:

  • 您在运行 logstash 时是否有任何类型的输出/日志?此外,在对问题进行故障排除时,输出文件/标准输出以查看 logstash 正在处理的内容会很有用。
  • 是的,我用日志更新了我的第一篇文章

标签: logstash logstash-grok logstash-configuration


【解决方案1】:

您的配置语法错误,如消息Expected one of 所示。首先你错过了一个},然后grok模式必须声明为一个字符串,在"之间,字符串中的"转义(如\"),在grok过滤器中,选项是匹配的,而不是映射。

所以纠正所有这些,从第一个 conf 开始:

input {
  beats {
    port => 5044
    codec => "json"
  }
}
filter {
    if "HTTP" in [message] {
        grok {
            match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{WORD:type} %{LOGLEVEL:level} \"%{WORD:method} %{URIPATHPARAM:url}\" %{INT:code} %{INT:bytes} - %{GREEDYDATA:response_time}" }
        }
    } else if "APP" in [message] {
        grok {
            match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{WORD:type} %{LOGLEVEL:level} %{GREEDYDATA:jsonstring}"  }
        }
        json {
            source => "jsonstring"
            target => "doc"
        }
        mutate {
            add_field => {
                "code" => "%{[doc][code]}"
                "message" => "%{[doc][message]}"
            }
        }
    }
}

output { 
    elasticsearch { 
        hosts => ["localhost"] 
    } 
}

【讨论】:

    猜你喜欢
    • 2022-01-07
    • 1970-01-01
    • 1970-01-01
    • 2015-07-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多