【问题标题】:How do I setup a 1:N workflow with filters in Logstash?如何在 Logstash 中使用过滤器设置 1:N 工作流程?
【发布时间】:2013-06-27 14:57:12
【问题描述】:

我正在尝试设置一个 logstash 工作程序,它从一个 amqp/rabbitmq 队列中获取所有消息,过滤一些消息以发送到 statsD,但还将所有消息发送到弹性搜索。以下实现仅不向 ElasticSearch 发送任何消息。

input {
  rabbitmq {
    host => "amqp-host"
    queue => "elasticsearch"
    key => "elasticsearch"
    exchange => "elasticsearch"
    type => "all"
    durable => true
    auto_delete => false
    exclusive => false
    format => "json_event"
    debug => false
  }
}

filter {
    grep {
      add_tag => "grepped"
      match => ["@message", "Execution of .*? took .* sec"]
    }

    grok {
        tags => ["grepped"]
        add_tag => "grokked"
        pattern => "Execution of %{DATA:command_name} took %{DATA:response_time} sec"
    }

    mutate {
        tags => ["grepped", "grokked"]
        lowercase => [ "command_name" ]
        add_tag => ["mutated"]
    }
}

output {
  elasticsearch_river {
    type => "all"
    rabbitmq_host => "amqp-host"
    debug => false
    durable => true
    persistent => true
    es_host => "es-host"
    exchange => "logstash-elasticsearch"
    exchange_type => "direct"
    index => "logs-%{+YYYY.MM.dd}"
    index_type => "%{@type}"
    queue => "logstash-elasticsearch" 
  }

 statsd {
    type => "command-filter"
    tags => ["grepped", "grokked", "mutated"]
    host => "some.domain.local"
    port => 1234
    sender => ""
    namespace => ""
    timing => ["prefix.%{command_name}.suffix", "%{response_time}"]
    increment => ["prefix.%{command_name}.suffix"]
  }
}

有什么包罗万象的过滤器吗?或者一种安排标签的方法,以便过滤一些消息但全部转发到 ES?

【问题讨论】:

    标签: elasticsearch amqp logstash statsd


    【解决方案1】:

    clone 过滤器派上用场了。以下是我生成的配置文件。

    input {
      rabbitmq {
        host => "amqp-host"
        queue => "elasticsearch"
        key => "elasticsearch"
        exchange => "elasticsearch"
        type => "all"
        durable => true
        auto_delete => false
        exclusive => false
        format => "json_event"
        debug => false
      }
    }
    
    filter {
        clone {
            exclude_tags => ["cloned"]
            clones => ["statsd", "elastic-search"]
            add_tag => ["cloned"]
        }
    
        grep {
          type => "statsd"
          add_tag => "grepped"
          match => ["@message", "Execution of .*Command took .* sec"]
        }
    
        grok {
            type => "statsd"
            tags => ["grepped"]
            add_tag => "grokked"
            pattern => "Execution of %{DATA:command_name}Command took %{DATA:response_time} sec"
        }
    
        mutate {
            type => "statsd"
            tags => ["grepped", "grokked"]
            lowercase => [ "command_name" ]
            add_tag => ["mutated"]
        }
    }
    
    output {
      elasticsearch_river {
        type => "all"
        rabbitmq_host => "amqp-host"
        debug => false
        durable => true
        persistent => true
        es_host => "es-host"
        exchange => "logstash-elasticsearch"
        exchange_type => "direct"
        index => "logs-%{+YYYY.MM.dd}"
        index_type => "%{@type}"
        queue => "logstash-elasticsearch" 
      }
    
      statsd {
        type => "statsd"
        tags => ["grepped", "grokked", "mutated"]
        host => "some.host.local"
        port => 1234
        sender => ""
        namespace => ""
        timing => ["commands.%{command_name}.responsetime", "%{response_time}"]
        increment => ["commands.%{command_name}.requests"]
      }
    }
    

    【讨论】:

    • 您能否详细介绍一下克隆在这里的作用?过滤器中项目的顺序重要吗?
    • @FuzzyAmi 已经有一段时间了,我快速浏览了文档。 “克隆”过滤器创建新的日志事件。但是,每个“克隆”事件都会再次通过整个管道。所以......我克隆每条消息,并在add_tag => ['cloned'] 中添加一个“克隆”标签,然后当它再次出现时,exclude_tags=>['cloned'] 确保我不会以克隆事件的无限循环结束。 clones => ['statsd', 'elastic-search'] 键/值创建 2 个克隆事件,每个事件都有一个标签。可以将其视为下游处理的分支机制。
    【解决方案2】:

    clone 过滤器在您的情况下实际上是不必要的。

    我推荐几件事:

    • 请简化您的配置,直到您的“基础”正常工作。暂时不要设置可选参数
    • 确保type 匹配
    • 使用标签,它们是救命稻草
    • “grepping”时,您应该保持一致并始终使用grok 或始终使用grep。我的偏好是grok

    其他一些提示... 根据邮件设置的标签,选择您想对邮件执行的操作。这是我的一个示例配置 sn-p:

    grok{
       type => "company"
       pattern => ["((status:new))"]
       add_tag => ["company-vprod-status-new", "company-vprod-status-new-%{company}", "company-vprod-status-new-%{candidate_username}"]
       tags=> "company-vprod-status-change"
    }
    output {
     elasticsearch {
        host => "127.0.0.1"
        type => "company"
        index => "logstash-syslog-%{+YYYY.MM.dd}"
     }
     statsd {
        host => "graphite.test.company.com"
        increment => ["vprod.statuses.new.all", "vprod.statuses.new.%{company}.all"]
        tags => ["company-vprod-status-new"]   
     }
    }
    

    另外,要密切注意你的类型。如果您将type 属性设置为不存在的值,则该块将永远不会被触发。这就是为什么我更喜欢使用标签的原因,除非有明确的理由使用类型。

    【讨论】:

    • 所有很棒的提示,但答案并没有为我的问题提供实际答案。问题最终是所有消息都通过了 grep 过滤器,而那些没有通过它的消息被简单地丢弃了。
    • 看起来除了你刚才提到的问题之外还有另一个问题。您的 ES 块被配置为从“全部”类型中获取所有消息,而不是基于标签进行过滤。因此,您所拥有的一切都是正确的,而您提到的似乎并不是罪魁祸首。如果您的解决方案有效,那真的很棒,但根据您提供的信息,它似乎不是一个值得考虑的解决方案。也许您可以编辑您的答案以解释您的解决方案为何有效以及解决了什么问题?
    【解决方案3】:

    您还可以添加:

    drop => false
    

    在 grep 节的末尾(如果您仍在使用 grep)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多