数据分析python技能之es数据提取
https://www.jianshu.com/p/3c17561691a5
Elasticsearch在NoSQL和时间序列的数据存储中占的比重越来越大。
Elasticsearch 公司的产品栈非常全面,打通数据采集,传递,存储,展示,而且部署简单快速,半天时间就可以搭建一套完整的POC出来。

目前大数据当道,数据的结构变化越来越快,越来越多的公司把原始数据存储在ES中,数据经过二次处理后在存储的mysql等结构化的数据库中。
作为数据分析师,平时和ES打交道的时间越来越多,除了对ES的查询语法熟悉之外,还需要会使用python从ES中提取自己想要的数据。
这里记录的便是基于es的python客户端来从es中提取超过10000条记录的方法。
默认ES 查询返回的记录数为10000,当然这个数字可以通过修改ES的配置来变大或者变小。但是作为数据分析师,一般不会有ES修改配置的权限。
import json
from elasticsearch import Elasticsearch
hosts = []
es = Elasticsearch(hosts=hosts)
indices = [\'indice0\', \'indice1\']
# Initialize the scroll
page = es.search(
index=\',\'.join(indices),
doc_type=\'demo\',
scroll=\'2m\',
search_type=\'scan\',
size=1000,
q=\'user_id:123 AND type:user\' # 填写 Kibana 搜索栏里的 Lucene 查询语法字符串
)
sid = page[\'_scroll_id\']
scroll_size = page[\'hits\'][\'total\']
print \'total scroll_size: \', scroll_size
l = []
# Start scrolling
while scroll_size > 0:
print "Scrolling..."
page = es.scroll(scroll_id=sid, scroll=\'2m\')
# Update the scroll ID
sid = page[\'_scroll_id\']
# Get the number of results that we returned in the last scroll
scroll_size = len(page[\'hits\'][\'hits\'])
print "scroll size: " + str(scroll_size)
# Do something with the obtained page
docs = page[\'hits\'][\'hits\']
l += [x[\'_source\'] for x in docs]
print \'total docs: \', len(l)
file_path = \'demo.json\'
with open(file_path, \'wb\') as f:
json.dump(l, f, indent=2)
可以对比打印出来的doc数量与scroll size便可以检查是否全部记录都提取出来了。最后将数据存储到json文件中。
基于ES提供的python 客户端的方式可以提取的数量不要超过100万行,否则很容易超时失败。应该跟底层的http库有关系。
要从一个Index中提取超过千万行的数据,最佳实践是基于Java的客户端或者ES提供的Hadoop库,或者使用Python自己构造http请求,处理错误信息。
本系列文章均为实际工作中遇到的场景,以此记录下来,共同进步,更愉悦的工作。
作者:接地气的大仙儿
链接:https://www.jianshu.com/p/3c17561691a5
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。