【问题标题】:Logstash shutdown stalling when starting from bash script从 bash 脚本启动时 Logstash 关闭停止
【发布时间】:2018-03-22 08:34:05
【问题描述】:

我编写了一个 bash 脚本,该脚本在指定文件夹中查找 CSV 文件,并使用正确的配置文件将它们通过管道传输到 logstash。但是,在运行此脚本时,我遇到以下错误,说关闭过程已停止,导致无限循环,直到我使用 ctrl+c 手动停止它:

[2018-03-22T08:59:53,833][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.3"}
[2018-03-22T08:59:54,211][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-03-22T08:59:57,970][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-03-22T08:59:58,116][INFO ][logstash.pipeline        ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0xf6851b3 run>"}
[2018-03-22T08:59:58,246][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2018-03-22T08:59:58,976][INFO ][logstash.outputs.file    ] Opening file {:path=>"/home/kevin/otrs_customer_user"}
[2018-03-22T09:00:06,471][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{["LogStash::Filters::CSV", {"separator"=>";", "columns"=>["IOT", "OID", "SUM", "XID", "change_by", "change_time", "city", "company", "company2", "create_by", "create_time", "customer_id", "email", "fax", "first_name", "id", "inst_city", "inst_first_name", "inst_last_name", "inst_street", "inst_zip", "last_name", "login", "mobile", "phone", "phone2", "street", "title", "valid_id", "varioCustomerId", "zip"], "id"=>"f1c74146d6672ca71f489aac1b4c2a332ae515996657981e1ef44b441a7420c8"}]=>[{"thread_id"=>23, "name"=>nil, "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:90:in `read_batch'"}]}}
[2018-03-22T09:00:06,484][ERROR][logstash.shutdownwatcher ] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.
[2018-03-22T09:00:11,438][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{["LogStash::Filters::CSV", {"separator"=>";", "columns"=>["IOT", "OID", "SUM", "XID", "change_by", "change_time", "city", "company", "company2", "create_by", "create_time", "customer_id", "email", "fax", "first_name", "id", "inst_city", "inst_first_name", "inst_last_name", "inst_street", "inst_zip", "last_name", "login", "mobile", "phone", "phone2", "street", "title", "valid_id", "varioCustomerId", "zip"], "id"=>"f1c74146d6672ca71f489aac1b4c2a332ae515996657981e1ef44b441a7420c8"}]=>[{"thread_id"=>23, "name"=>nil, "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:90:in `read_batch'"}]}}

当我使用bash logstash -f xyz.config &lt; myfile.config 手动运行相同的文件和相同的配置时,它会按预期工作并且进程会正确终止。在 bash 脚本中,我基本上使用的是确切的命令,但遇到了上面的错误。

我还注意到问题似乎是随机的,而且并非每次都出现在同一个文件和配置上。

我的配置包含一个标准输入,一个 csv 过滤器,用于测试 json 格式的输出到一个文件(也删除了stdout{})。

有人知道为什么我的进程在脚本执行期间会停止吗?或者如果没有,是否有办法告诉logstash在停止时关闭?

示例配置:

input {
  stdin {
    id => "${LS_FILE}"
  }

}
filter {
    mutate {
        add_field => { "foo_type" => "${FOO_TYPE}" }
        add_field => { "[@metadata][LS_FILE]" => "${LS_FILE}"}
    }
    if [@metadata][LS_FILE] == "contacts.csv" {
      csv {
          separator => ";"
          columns =>
          [
            "IOT",
            "OID",
            "SUM",
            "XID",
            "kundenid"              
          ]
      }
      if [kundenid]{
        mutate {
            update => { "kundenid" => "n-%{kundenid}" }
        }
        }
    }
}
output {   
       if [@metadata][LS_FILE] == "contacts.csv" {
           file{
                path => "~/contacts_file"
                codec => json_lines
            }
       }
}

示例脚本:

LOGSTASH="/customer/app/logstash-6.2.3/bin/logstash"

    for file in $(find $TARGETPATH -name *.csv) # Loop each file in given path
    do
        if [[ $file = *"foo"* ]]; then
            echo "Importing $file"
            export LS_FILE=$(basename $file)
            bash $LOGSTASH -f $CFG_FILE < $file  # Starting logstash
            echo "file $file imported."
        fi
    done

我在 bash 脚本中导出环境变量,并将它们设置为 logstash 配置中的元数据,以针对不同的输入文件执行一些条件。文件中的 JSON 输出仅用于测试目的。

【问题讨论】:

  • 能否将实际脚本和配置添加到问题中?日志引用“忙或被阻止的插件” - 您正在加载哪些插件?
  • 刚刚添加了我的脚本和配置的摘要。奇怪的是,这个问题似乎很少发生在不同的 csv 文件上。我无法通过手动调用配置来重现问题。
  • LOGSTASH 是如何设置的?
  • 想法:(1) 如果您的任何文件名有空格,那么引用不足会咬到您 - 使用 export LS_FILE="$(basename "$file")"bash "$LOGSTASH" -f "$CFG_FILE" &lt; "$file"(2) 类似 - 读取带有IFS=... &lt; &lt;(find...) 的文件名,例如here(3) findbash ... &lt; $file 之间存在竞争条件,这可能会给您带来不一致的行为。
  • 数组听起来是个好方法。不幸的是,竞争仍然存在 - 从技术上讲,文件可以在 findbash &lt; "$file" 之间移动或删除。恐怕我对logstash的经验不够丰富,无法提出任何深刻的建议。我确实注意到警告消息列出的列比配置文件中的 csv{} 块多得多。另外,您的logstash 是否足够新以包含this?祝你好运!

标签: linux bash elasticsearch logstash


【解决方案1】:

当您尝试关闭时,Logstash 会尝试执行各种步骤,例如,

  • 它会停止所有输入、过滤和输出插件
  • 处理所有进行中的事件
  • 终止 Logstash 进程

并且有多种因素导致关闭过程非常不可预测,例如,

  • 一个输入插件以慢速接收数据。
  • 慢速过滤器,例如执行 sleep(10000) 的 Ruby 过滤器或执行非常繁重的查询的 Elasticsearch 过滤器。
  • 一个断开连接的输出插件正在等待重新连接以刷新进行中的事件。

来自Logstash documentation

Logstash 有一个停顿检测机制,可以分析 关闭期间的管道和插件。这种机制产生 关于内部飞行事件计数的周期性信息 队列和繁忙的工作线程列表。

您可以在启动logstash 时使用--pipeline.unsafe_shutdown 标志来强制终止进程以防停机。当--pipeline.unsafe_shutdown 未启用时,Logstash 会继续运行并定期生成这些报告,这就是为什么在您的情况下问题似乎是随机的。

请记住,不安全的关闭、强制终止 Logstash 进程或崩溃 Logstash 进程因任何其他原因可能导致数据丢失 (除非您已启用 Logstash 以使用 persistent queues)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-22
    • 1970-01-01
    相关资源
    最近更新 更多