【问题标题】:Python parallel_bulk, info says no errors, but nothing in indexPython parallel_bulk,信息说没有错误,但索引中没有
【发布时间】:2021-10-26 22:11:45
【问题描述】:

这是我的 python 代码的一部分。由于我不知道输入结构函数接受的是什么,所以我花了一段时间才让这个工作没有任何错误......无论如何它是:

docs = [
           {
            "Created":  "2021-05-30T18:35:16.587Z",
            "Action":  "AccessRequestCancelled",
            "Type":  "ACCESS_REQUESTT",
            "Actor":  "mateparac",
            "Target":  "mateparac",
            "Stack":  "wps",
            "TrackingNumber":  "2c9180867a5ca31f017a5e332e763f9",
            "Objects":  "ACCESS",
            "Operation":  "REQUEST",
            "Status":  "CANCELLED",
            "TechnicalName":  "ACCESS_REQUEST_CANCELLEDs",
            "Name":  "Request Access Cancelled!"
        },
        {
            "Created":  "2021-04-30T18:35:16.587Z",
            "Action":  "AccessRequestCancelled",
            "Type":  "ACCESS_REQUESTT",
            "Actor":  "mateparac",
            "Target":  "mateparac",
            "Stack":  "wps",
            "TrackingNumber":  "2c9180867a5ca31f017a5e332e763f9",
            "Objects":  "ACCESS",
            "Operation":  "REQUEST",
            "Status":  "CANCELLED",
            "TechnicalName":  "ACCESS_REQUEST_CANCELLEDs",
            "Name":  "Request Access Cancelled!"
        },
        {
            "Created":  "2021-04-30T18:35:16.587Z",
            "Action":  "AccessRequestCancelled",
            "Type":  "ACCESS_REQUESTT",
            "Actor":  "mateparac",
            "Target":  "mateparac",
            "Stack":  "wps",
            "TrackingNumber":  "2c9180867a5ca31f017a5e332e763f9",
            "Objects":  "ACCESS",
            "Operation":  "REQUEST",
            "Status":  "CANCELLED",
            "TechnicalName":  "ACCESS_REQUEST_CANCELLEDs",
            "Name":  "Request Access Cancelled!"
        }
]


def insert_data(documents, indexname):
    for document in documents:
        json_line = {}
        for key in document:
            json_line[key] = document[key]
        yield {
            '_op_type': 'index',
            '_index': indexname,
            '_type': 'doc',
            '_source': json.dumps(json_line)
        }


for success, info in parallel_bulk(es, insert_data(docs, "sailpont-dev-logs"), chunk_size=5000, thread_count=4, queue_size=4):
    if not success:
        print('A document failed:', info)

在调试期间,信息返回一切正常,没有错误,但我没有在索引中找到这些文档。最初我尝试使用以下代码使用 elasticsearch.helpers.bluk,但出现错误。

elasticsearch.exceptions.RequestError: RequestError(400, 'action_request_validation_exception', 'Validation Failed: 1: index is missing;2: index is missing;3: index is missing;4: index is missing;5: index is missing;6: index is missing;7: index is missing;8: index is missing;9: index is missing;10: index is missing;11: index is missing;12: index is missing;13: index is missing;14: index is missing;15: index is missing;16: index is missing;17: index is missing;18: index is missing;19: index is missing;20: index is missing;21: index is missing;22: index is missing;23: index is missing;24: index is missing;25: index is missing;26: index is missing;27: index is missing;28: index is missing;29: index is missing;30: index is missing;31: index is missing;32: index is missing;33: index is missing;34: index is missing;35: index is missing;36: index is missing;37: index is missing;38: index is missing;39: index is 

我假设输入数据有问题,但它的结构是按照https://www.elastic.co/guide/en/elasticsearch/reference/7.x/docs-bulk.html#docs-bulk-api-request

test_docs = '''
{ "create" : { "_index" : "sailpont-dev-logs", "_type" : "doc"} }
{ "Created":  "2021-04-30T18:35:16.587Z", "Action":  "AccessRequestCancelled","Type":  "ACCESS_REQUESTT","Actor":  "matepric","Target":  "brianschaffner","Stack":  "wps","TrackingNumber":  "2c9180867a5ca31f017a5e332e763f9", "Objects":  "ACCESS", "Operation":  "REQUEST", "Status":  "CANCELLED","TechnicalName":  "ACCESS_REQUEST_CANCELLEDs","Name":  "Request Access Cancelled!"}
{ "create" : { "_index" : "sailpont-dev-logs", "_type" : "doc"} }
{ "Created":  "2021-04-29T18:35:16.587Z", "Action":  "AccessRequestCancelled","Type":  "ACCESS_REQUESTT","Actor":  "matepric","Target":  "brianschaffner","Stack":  "wps","TrackingNumber":  "2c9180867a5ca31f017a5e332e763f9", "Objects":  "ACCESS", "Operation":  "REQUEST", "Status":  "CANCELLED","TechnicalName":  "ACCESS_REQUEST_CANCELLEDs","Name":  "Request Access Cancelled!"}
'''
bulk(es,test_docs, "sailpont-dev-logs")

不确定这两种情况的问题是什么。

【问题讨论】:

    标签: python elasticsearch


    【解决方案1】:

    你只是输入错误

    _doc 而不是doc 中的'_type': 'doc',

    而且来源应该是您的dict 直接帮助者会为您完成这项工作

    for document in documents:
        yield {
            '_op_type': 'index',
            '_index': indexname,
            '_type': '_doc',
            '_source': document
        }
    

    【讨论】:

    • 感谢您指出这个错字。对于源,我用json_line 替换了json.dumps(json_line),但es 中仍然没有文档。令人困惑的部分是为什么我没有收到错误:(
    • 首先,尝试只索引1个文档,看看有没有问题,然后切换到bulk,像json不是唯一的问题,doc_type也是错误的,我也会建议放置命名参数,因为弹性python库改变了参数顺序
    • 你说得对,我什至无法记录单个文档。有趣的是,如果我将相同的数据(单个文档)复制/粘贴到 Kibana 控制台 - 开发工具,则会创建文档并且我可以找到它。 res = es.create(index="sailpont-dev-logs", body=doc, id=1111) 说创建但在 ES 中什么都没有...我将尝试将客户端降级到
    • 发现问题了,我的索引名错字了!!
    【解决方案2】:

    散装部分的解决方案

    t = test_docs.splitlines()
    n = list(map(json.loads, filter(lambda x: x, t)))
    bulk(es, n, "sailpoint-dev-logs")
    
    

    【讨论】:

      猜你喜欢
      • 2011-12-22
      • 2014-12-10
      • 2013-07-25
      • 2017-01-04
      • 1970-01-01
      • 2014-03-12
      • 1970-01-01
      • 2021-11-03
      • 1970-01-01
      相关资源
      最近更新 更多