【问题标题】:Extract from ElasticSearch, into Kafka, continuously any new ES updates using logstash从 ElasticSearch 中提取到 Kafka,使用 logstash 持续更新任何新的 ES
【发布时间】:2016-03-09 20:36:34
【问题描述】:

我有一个具有多个索引的 ES 集群,它们都以随机时间间隔接收更新。我有一个从 ES 中提取数据并将其传递给 Kafka 的 logstash 实例。

每分钟运行一次并在 ES 中获取任何更新的好方法是什么?

会议:

 input {
   elasticsearch {
     hosts => [ "hostname1.com:5432", "hostname2.com" ]
     index => "myindex-*"
     query => "*"
     size => 10000
     scroll => "5m"
   }
 }
 output {
   kafka {
     bootstrap-servers => "abc-kafka.com:1234"
     topic_id => "my.topic.test"
   }
 }

我想在查询中使用文档@timestamp 并将其保存在临时文件中,然后重新运行查询(带有计划)并获取最新的更新/插入(类似于logstash 的jdbc-input plugin 支持的内容)

有什么想法吗?

提前谢谢你

【问题讨论】:

    标签: elasticsearch logstash logstash-configuration


    【解决方案1】:

    几个月前有人问过same thing,但那个问题并没有获得太多流量。也许你可以 +1。

    同时,您可以将elasticsearch 输入中的query 修改为如下所示:

    query => '{"query":{"range":{"timestamp":{"gt": "now-1m"}}}}'
    

    即您查询了过去一分钟内所有timestamp 字段(任意名称,更改以匹配您的名称)的文档

    然后你需要设置一个 cron 来每分钟启动你的 logstash 进程。现在由于触发 cron 的那一刻、logstash 开始运行的那一刻和查询到达 ES 服务器端的那一刻之间的延迟,只要知道1m 可能还不够,您可能会丢失一些文档。您需要对此进行测试并找出哪个最好。

    根据这个recent blog post,另一种方法是记录Logstash上次在环境变量LAST_RUN中运行的时间,并在查询中使用该变量:

    query => '{"query":{"range":{"timestamp":{"gt": "${LAST_RUN}"}}}}'
    

    在这种情况下,您将创建一个由 cron 运行的 shell 脚本,它基本上执行以下操作:

    1. 运行logstash -f your_config_file.conf
    2. 完成后,设置LAST_RUN=$(date +"%FT%T")

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-05-24
      • 1970-01-01
      • 2023-03-30
      • 2015-11-10
      • 1970-01-01
      • 2016-03-09
      • 2017-03-15
      相关资源
      最近更新 更多