ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。下面介绍了利用Python API接口进行数据查询,方便其他系统的调用。
安装API
pip3 install elasticsearch
建立es连接
无用户名密码状态
from elasticsearch import Elasticsearch es = Elasticsearch([{'host':'10.10.13.12','port':9200}])
默认的超时时间是10秒,如果数据量很大,时间设置更长一些。如果端口是9200,直接写IP即可。代码如下:
es = Elasticsearch(['10.10.13.12'], timeout=3600)
用户名密码状态
如果Elasticsearch开启了验证,需要用户名和密码
es = Elasticsearch(['10.10.13.12'], http_auth=('xiao', '123456'), timeout=3600)
数据检索功能
es.search(index='logstash-2015.08.20', q='http_status_code:5* AND server_name:"web1"', from_='124119')
常用参数
- index - 索引名
- q - 查询指定匹配 使用Lucene查询语法
- from_ - 查询起始点 默认0
- doc_type - 文档类型
- size - 指定查询条数 默认10
- field - 指定字段 逗号分隔
- sort - 排序 字段:asc/desc
- body - 使用Query DSL
- scroll - 滚动查询
统计查询功能
语法同search大致一样,但只输出统计值
es.count(index='logstash-2015.08.21', q='http_status_code:500')
输出:
{'_shards':{'failed':0, 'successful':5, 'total':5}, 'count':17042}
17042 就是统计值!
知识扩展
滚动demo
# Initialize the scroll page = es.search( index ='yourIndex', doc_type ='yourType', scroll ='2m', search_type ='scan', size =1000, body ={ # Your query's body }) sid = page['_scroll_id'] scroll_size = page['hits']['total'] # Start scrolling while(scroll_size >0): print "Scrolling..." page = es.scroll(scroll_id = sid, scroll ='2m') # Update the scroll ID sid = page['_scroll_id'] # Get the number of results that we returned in the last scroll scroll_size = len(page['hits']['hits']) print "scroll size: "+ str(scroll_size) # Do something with the obtained page