【发布时间】:2020-10-19 17:42:04
【问题描述】:
我必须使用 Python 在 ElasticSearch 上运行查询以提取大量数据。我在 Python 3 中使用 elasticsearch 7.9.1。
问题是,我的脚本从一次运行到另一次运行返回的行数不同。 有时我得到大约 300,000 个结果,有时更多(100 万),有时为零。这似乎正在发生,特别是如果 2 次运行彼此接近。我不会更改查询或搜索的时间范围。
我注意到更改es.search 方法中的scroll 参数似乎会改变脚本的行为。
例如,如果scroll = '1s',page['hits']['hits'] 为零,这对我来说真的没有意义。
这是我在脚本中使用的类:
class ElasticsearchFinder():
def __init__(self, cfg):
logger.info('ElasticsearchFinder.__init__ : initiate parameters')
try:
######### ES configuration #######
self.port = cfg.get('Elasticsearch', 'port')
self.hostnames = cfg.get('Elasticsearch', 'hostnames')
self.username = cfg.get('Elasticsearch', 'username')
self.password = cfg.get('Elasticsearch', 'password')
self.index = cfg.get('Elasticsearch', 'index')
self.delay = cfg.get('Elasticsearch', 'delay')
self.folder = cfg.get('Elasticsearch', 'root_csv_folder')
self.filename = cfg.get('Elasticsearch', 'csv_filename')
self.ssl_es_context = create_ssl_context()
self.ssl_es_context.check_hostname = False
self.ssl_es_context.verify_mode = ssl.CERT_NONE
try:
self.start_time_conf = cfg.get('Elasticsearch', 'start_time')
except:
self.start_time_conf = None
try:
self.end_time_conf = cfg.get('Elasticsearch', 'end_time')
except:
self.end_time_conf = None
except Exception as e:
logger.error('ElasticsearchFinder.__init__ : initiate parameters failed, please verify fetch_qradar.conf : %s', str(e))
now = datetime.datetime.now()
if self.delay != "0":
start_date = now - datetime.timedelta(hours=int(self.delay))
else:
start_date = now - datetime.timedelta(weeks=52)
self.end_time = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
self.start_time = start_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
if self.start_time_conf:
self.start_time = datetime.datetime.strptime(self.start_time_conf, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")
if self.end_time_conf:
self.end_time = datetime.datetime.strptime(self.end_time_conf, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")
try:
self.es = Elasticsearch(self.hostnames, port = self.port, scheme="https",
http_auth=(self.username, self.password), ssl_context=self.ssl_es_context)
except Exception as e:
logger.error('ElasticsearchFinder.__init__ : Connect to remote Elasticsearch %s:%s failed : %s', self.hostname, self.port, str(e))
def search_elastic(self, start_time=None, end_time=None):
try:
# field to store all data
data = []
# parameters end_time and start_time as ARGS
if start_time:
self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")
if end_time:
self.end_time = datetime.datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")
logger.info('ElasticsearchFinder.search_elastic : QUERY = field1: "value1" and (field2: "value2" or "value3")')
query = {
"query": {
"bool": {
"must": [
{"match" : {"field1" : "value1"}},
{"range": {"timestamp": {"lt": self.end_time, "gte": self.start_time}}},
{
"bool": {
"should": [
{"match" : {"field2" : "value2"}},
{"match" : {"field2" : "value3"}}
]
}
}
]
}
}
}
# Initialize the scroll
page = self.es.search(
index=self.index,
scroll='30m',
size=1000,
body=query)
sid = page['_scroll_id']
scroll_size = page['hits']['total']['value']
logger.info('ElasticsearchFinder.search_elastic : search for qradar from {} to {}. Total hits {}'.format(self.start_time, self.end_time, scroll_size))
# fetch data
for i in page['hits']['hits']:
data.append({'some_field' : i[_source]['some_field']
})
# Start scrolling
while (scroll_size > 0):
# Get the number of results that we returned in the last scroll
page = self.es.scroll(scroll_id=sid, scroll='30m')
scroll_size = len(page['hits']['hits'])
for i in page['hits']['hits']:
data.append({'some_field' : i[_source]['some_field']
})
# Update the scroll ID (to move to next page)
sid = page['_scroll_id']
logger.info('ElasticsearchFinder.search_elastic : Total stored data {}'.format(len(data)))
# write CSV file
self.writeToCSV(self.folder+self.filename, data)
该查询已直接在 Kibana 上进行检查并且是正确的。 (我的脚本也没有返回与 Kibana 相同数量的结果)
我必须设置较大的超时时间,否则我会收到超时错误。
知道为什么每次都没有返回相同数量的结果吗?
我想scroll_id 可能被保存在某个地方,所以 Elastic 没有返回它在上一次运行中已经返回的结果,但 scroll_id 从一次运行更改为下一次运行,所以这似乎不太可能。
【问题讨论】:
-
您好,请问您的 elasticsearch 集群如何?你有单个节点还是弹性搜索集群?如果你有集群,我可以知道你机器的配置吗?
-
@saeednasehi 这是一个集群,但我不知道配置,我只知道主机名
-
在我看来,这个问题是因为你的集群。我认为您的集群的一个或多个节点存在一些问题,例如网络延迟或硬盘慢。我不确定这一点,但您的集群的这种行为显示了围绕这个问题的一些东西。
-
@saeednasehi 集群已被同事检查,似乎没有任何问题,我将尝试在脚本中使用超时参数
标签: python elasticsearch