【问题标题】:How to Bulk index in Elastic Search using the Python API如何使用 Python API 在 Elastic Search 中批量索引
【发布时间】:2015-04-05 23:21:46
【问题描述】:

我正在尝试使用 Python API 将大量文档批量插入到弹性搜索中。

import elasticsearch
from pymongo import MongoClient

es = elasticsearch.Elasticsearch()

def index_collection(db, collection, fields, host='localhost', port=27017):
    conn = MongoClient(host, port)
    coll = conn[db][collection]
    cursor = coll.find({}, fields=fields, timeout=False)
    print "Starting Bulk index of {} documents".format(cursor.count())

    def action_gen():
        """
        Generator to use for bulk inserts
        """
        for n, doc in enumerate(cursor):

            op_dict = {
                '_index': db.lower(),
                '_type': collection,
                '_id': int('0x' + str(doc['_id']), 16),
            }
            doc.pop('_id')
            op_dict['_source'] = doc
            yield op_dict

    res = bulk(es, action_gen(), stats_only=True)
    print res

文档来自 Mongodb 集合,我根据文档中解释的方式使用上面的函数进行批量索引。

批量索引继续使用数千个空文档填充弹性搜索。谁能告诉我我做错了什么?

【问题讨论】:

  • 您的索引是否已经存在于 ES 中?如果是这样,是否为它定义了任何映射(是否有可能来自 Mongo 的文档不适合映射)?
  • 您的代码对我有用。也许您的错误是特定于数据的。愿意举一个简单的例子吗?

标签: python mongodb elasticsearch


【解决方案1】:

我从未见过以这种方式汇总大量数据,尤其是您使用 "_source" 所做的事情。可能有一种方法可以让它发挥作用,我不知道,但是当我尝试它时,我得到了奇怪的结果。

如果您查看bulk api,ES 期待一个元数据文档,那么该文档将被索引。因此,每个文档的批量数据列表中都需要两个条目。所以可能是这样的:

import elasticsearch
from pymongo import MongoClient

es = elasticsearch.Elasticsearch()

def index_collection(db, collection, fields, host='localhost', port=27017):
    conn = MongoClient(host, port)
    coll = conn[db][collection]
    cursor = coll.find({}, fields=fields, timeout=False)
    print "Starting Bulk index of {} documents".format(cursor.count())

    bulk_data = []

    for n, doc in enumerate(cursor):

        bulk_data.append({
            '_index': db.lower(),
            '_type': collection,
            '_id': int('0x' + str(doc['_id']), 16),
        })
        bulk_data.append(doc)

    es.bulk(index=index_name,body=bulk_data,refresh=True)

不过,我没有尝试运行该代码。这是一个我知道有效的脚本,如果有帮助,您可以使用它:

from elasticsearch import Elasticsearch

es_client = Elasticsearch(hosts = [{ "host" : "localhost", "port" : 9200 }])

index_name = "test_index"

if es_client.indices.exists(index_name):
    print("deleting '%s' index..." % (index_name))
    print(es_client.indices.delete(index = index_name, ignore=[400, 404]))

print("creating '%s' index..." % (index_name))
print(es_client.indices.create(index = index_name))

bulk_data = []

for i in range(4):
    bulk_data.append({
        "index": {
            "_index": index_name, 
            "_type": 'doc', 
            "_id": i
        }
    })
    bulk_data.append({ "idx": i })

print("bulk indexing...")
res = es_client.bulk(index=index_name,body=bulk_data,refresh=True)
print(res)

print("results:")
for doc in es_client.search(index=index_name)['hits']['hits']:
    print(doc)

【讨论】:

  • 它可能有效,但对于我的用例,我不想在索引之前将所有数据加载到内存中的列表中,因此生成器函数。文档引导我找到我发布的代码:elasticsearch-py.readthedocs.org/en/master/helpers.html#helpers
  • @SloanAhrens OP 正在使用 bulk 的辅助版本,它与标准的 Bulk API 有点不同。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-12-13
  • 1970-01-01
  • 2012-02-15
  • 1970-01-01
相关资源
最近更新 更多