【问题标题】:Load data from MongoDb to Elasticsearch through python通过python将数据从MongoDb加载到Elasticsearch
【发布时间】:2017-10-24 15:34:44
【问题描述】:

我在 MongoDb 中加载了一些 json 数据。例如doc1 = {"id": 1,"name": "x1"},doc2 = {"id": 2,"name": "x2"},doc3 = {"id": 3,"name": " x3"}。现在我希望将这些数据从 MongoDb 导入到 Elasticsearch。这段代码是我写的。

mgclient = MongoClient()
db = mgclient['light-test']
col = db['test']

es1 = Elasticsearch()
print ("Connected", es1.info())

es1.indices.create(index='light-test', ignore=400)

# Pull from mongo and dump into ES using bulk API
actions = []
for data in tqdm(col.find(), total=col.count()):
    data.pop('_id')
    action = {
        "_index": 'light-test',
        "_type": 'test',
        "_source": data
    }
    actions.append(action)
    print("complete")
    
# Dump x number of objects at a time   
    if len(actions) >= 100:
        deque(parallel_bulk(es1, actions), maxlen=0)
        actions = []

print("done")

a = es1.search(index='light-test', body={
  'query': {
    'match_all': {
     }
  }
})
print(a)

问题出在返回的查询中。命中显示空白,而它应该返回 json 文件。 results

帮助我将数据从 MongoDb 导入 Elasticsearch。

【问题讨论】:

    标签: python json mongodb elasticsearch


    【解决方案1】:

    app = Flask(__name__)
    
    MONGO_URL = '...'
    mgclient = MongoClient(MONGO_URL, ssl=True, ssl_cert_reqs=ssl.CERT_NONE)
    db = mgclient['light']
    col = db['task']
    
    doc1 = {...}
    doc2 = {...}
    doc3 = {...}
    post_id = col.insert_many([doc1, doc2, doc3])
    
    print(col.count())
    
    es1 = Elasticsearch(...)
    ESinfo=(es1.info())
    
    # Pull from mongo and dump into ES using bulk API
    actions = []
    for data in tqdm(col.find(), total=col.count()):
        data.pop('_id')
        action = {
                "index": {
                        "_index": 'light',
                        "_type": 'task',
                        }
        }
        actions.append(action)
        actions.append(data)
    
    #delete = es1.indices.delete(index = 'light')
    request_body = {
        "settings" : {
            "number_of_shards": 1,
            "number_of_replicas": 0
        }
    }
    es1.indices.create(index='light', body = request_body, ignore=400)
    res = es1.bulk(index = 'light', body = actions, refresh = True)
    
    result = col.find()
    names = []
    for obj in col.find():
        name = obj['name']
        names.append(name)
        print(names)
    
    @app.route('/query')
    def Query():
        a = es1.search(index='light', body={
          'query': {
            'match': {
              'name': '...',
             }
          }
        })
        return jsonify(query=a)
        
    if __name__ == "__main__":
    	app.run(host='0.0.0.0', port=1024)
      

    这有帮助。谢谢你:)

    【讨论】:

      猜你喜欢
      • 2021-06-07
      • 2017-10-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-10-05
      相关资源
      最近更新 更多