【问题标题】:Scripting in logstash在 logstash 中编写脚本
【发布时间】:2016-12-02 15:37:09
【问题描述】:

是否可以在logstash 中执行类似python 的脚本?我可以使用 logstash 将 csv 数据导入 elasticsearch。但我需要使用更新 API 而不是简单地索引所有行。

这是我的示例 csv 文件...

vi /tmp/head.txt
"Home","Home-66497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1VSZj","919359000000","HMSHOP","916265100000","2016-05-18 08:41:49"
"Home","Home-26497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1V1","919359000001","HMSHOP","916265100000","2016-05-18 18:41:49"
"Home","Home-36497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/SZj1","919359000001","HMSHOP","916265100000","2016-05-18 12:41:49"
"Home","Home-46497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1","919359000000","HMSHOP","916265100000","2016-05-18 14:41:49"
"Home","Home-56497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1VSZj1xc","919359000000","HMSHOP","916265100000","2016-05-18 16:41:49"

这里是logstash配置文件...

vi logstash.conf
input {
    file {
        path => "/tmp/head.txt"
        type => "csv"
        start_position => beginning
    }
}
filter {
    csv {
        columns => ["user", "messageid", "message", "destination", "code", "mobile", "mytimestamp"]
        separator => ","
    }
}

output {
    elasticsearch {
        action => "index"
        hosts => ["172.17.0.1"]
        index => "logstash-%{+YYYY.MM.dd}"
        workers => 1
    }
}

我已确认上述配置按预期工作,所有 5 条记录都存储为 5 个单独的文档。

这是我的 docker 命令...

docker run -d -v "/tmp/logstash.conf":/usr/local/logstash/config/logstash.conf -v /tmp/:/tmp/ logstash -f /usr/local/logstash/config/logstash.conf

问题是我需要根据目的地号码合并文档。目的地应该是文档的 ID。有一些行具有相同的目的地。例如_id: 919359000001 这个文档应该有以下两个记录作为嵌套对象。

"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"
"user": "Home", "messageid" "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/SZj1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp": "2016-05-18 12:41:49"

Elasticsearch 正在正确地将 csv 数据转换为 json,如上所示。我需要的是重新格式化语句以利用使用更新 API 的脚本 以下代码运行正常。

POST /test_index/doc/_bulk
{ "update" : { "_id" : "919359000001"} }
{ "script" : { "inline": "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]", "lang" : "groovy", "params" : {"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"}}, "upsert": {"parent" : [{"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"}] }}
{ "update" : { "_id" : "919359000001"} }
{ "script" : { "inline": "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]", "lang" : "groovy", "params" : {"user": "Home", "messageid": "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V13343", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 12:41:49"}}, "upsert": {"parent" : [{"user": "Home", "messageid": "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V13343", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 12:41:49"}] }}

如何在 logstash 中编写代码以将我的 csv 数据转换为类似于上面的样子?


更新

我有按预期工作的 python 代码。我想知道如何修改此代码以适应根据答案建议的“输出”参数。 在以下示例中,df_json 是一个 python 对象,它只不过是扁平化为 json 的 python 数据帧。

import copy
with open('myfile.txt', 'w') as f:
    for doc1 in df_json:
        import json
        doc = mydict(doc1)
        docnew = copy.deepcopy(doc)
        del docnew['destination']
        action = '{ "update": {"_id": %s }}\n' % doc['destination'] 
        f.write(action)
        entry = '{ "script" : { "inline": "ctx._source.parent += [\'user\': user, \'messageid\': messageid, \'message\': message, \'code\': code, \'mobile\': mobile, \'mytimestamp\': mytimestamp]", "lang" : "groovy", "params" : %s}, "upsert": {"parent" : [%s ] }}\n' %   (doc, docnew)
        f.write(entry)

! curl -s -XPOST XXX.xx.xx.x:9200/test_index222/doc/_bulk --data-binary @myfile.txt; echo

更新 2

我尝试了以下配置,它正在替换(不是根据脚本更新)文档。

output {
    elasticsearch {
        action => "index"
        hosts => ["172.17.0.1"]
        document_id => "%{destination}"
        index => "logstash3-%{+YYYY.MM.dd}"
        workers => 1
        script => "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]"
        script_type => "inline"
        script_lang =>  "groovy"
        scripted_upsert => "true"
    }
}

当我将操作更改为“更新”时,我收到以下错误...

:response=>{"update"=>{"_index"=>"logstash4-2016.07.29", "_type"=>"csv", "_id"=>"919359000000", 
"status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script", 
"caused_by"=>{"type"=>"script_exception", "reason"=>"failed to run in line script 
[ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]] 
using lang [groovy]", "caused_by"=>{"type"=>"missing_property_exception", "reason"=>"No such property: user for class: fe1b423dc4966b0f0b511b732474637705bf3bb1"}}}}}, :level=>:warn}

更新 3

根据 Val 的回答,我添加了事件并收到此错误...

:response=>{"update"=>{"_index"=>"logstash4-2016.08.06", "_type"=>"csv", "_id"=>"%{destination}", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script", "caused_by"=>{"type"=>"script_exception", "reason"=>"failed to run inline script [ctx._source.parent += ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]] using lang [groovy]", "caused_by"=>{"type"=>"null_pointer_exception", "reason"=>"Cannot execute null+{user=null, messageid=null, message=, code=null, mobile=null, mytimestamp=null}"}}}}}

更新 4

根据 Val 的更新答案,我尝试了这个...

script => "ctx._source.parent = (ctx._source.parent ?: []) + ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]"

得到了这个错误:

{:timestamp=>"2016-08-12T09:40:48.869000+0000", :message=>"Pipeline main started"}
{:timestamp=>"2016-08-12T09:40:49.517000+0000", :message=>"Error parsing csv", :field=>"message", :source=>"", :exception=>#<NoMethodError: undefined method `each_index' for nil:NilClass>, :level=>:warn}

数据库中只添加了 2 条记录。

【问题讨论】:

  • 更改操作 => “更新”而不是“索引”。

标签: python elasticsearch groovy logstash


【解决方案1】:

elasticsearch输出插件支持脚本参数:

output {
    elasticsearch {
        action => "update"
        hosts => ["172.17.0.1"]
        index => "logstash-%{+YYYY.MM.dd}"
        workers => 1
        script => "<your script here>"
        script_type => "inline"
        # Set the language of the used script
        # script_lang => 
        # if enabled, script is in charge of creating non-existent document (scripted update)
        # scripted_upsert => (default is false)
    }
}

【讨论】:

  • 更改更新操作没有帮助。查看更新后的问题。
【解决方案2】:

使用event 变量名将事件传递到输出中的脚本(默认情况下,您可以使用script_var_name 设置更改它)。

所以输出中的脚本需要考虑它。

    script => "ctx._source.parent = (ctx._source.parent ?: []) + ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]"

【讨论】:

  • 这个运气好吗?
  • 使用这个脚本,我只能添加 2 条记录。它没有像 python 那样更新记录。 (问题已更新)
  • 它正在替换当前的 id 而不是更新它。
  • 错误显示Error parsing csv 在我看来与脚本无关。它发生在事件到达elasticsearch 输出之前。请提供更多信息好吗?
  • @shantanuo 您能否进一步说明您的最新评论?正如我所说,错误发生在脚本有机会执行之前。
【解决方案3】:

既然你有工作 python 脚本,也许这会有用? https://www.elastic.co/guide/en/elasticsearch/plugins/current/lang-python.html

关于更新 nr 2 - 我认为可以通过首先检查文档是否具有给定字段(在这种情况下是用户)来修复错误。

【讨论】:

    猜你喜欢
    • 2018-03-07
    • 2020-10-03
    • 1970-01-01
    • 2018-03-25
    • 1970-01-01
    • 2023-03-15
    • 1970-01-01
    • 1970-01-01
    • 2013-09-29
    相关资源
    最近更新 更多