【问题标题】:Python Elasticsearch not returning the same number of results on each runPython Elasticsearch 在每次运行时没有返回相同数量的结果
【发布时间】:2020-10-19 17:42:04
【问题描述】:

我必须使用 Python 在 ElasticSearch 上运行查询以提取大量数据。我在 Python 3 中使用 elasticsearch 7.9.1。

问题是,我的脚本从一次运行到另一次运行返回的行数不同。 有时我得到大约 300,000 个结果,有时更多(100 万),有时为零。这似乎正在发生,特别是如果 2 次运行彼此接近。我不会更改查询或搜索的时间范围。

我注意到更改es.search 方法中的scroll 参数似乎会改变脚本的行为。 例如,如果scroll = '1s',page['hits']['hits'] 为零,这对我来说真的没有意义。

这是我在脚本中使用的类:

class ElasticsearchFinder():

    def __init__(self, cfg):
        logger.info('ElasticsearchFinder.__init__ : initiate parameters')
        try:
            ######### ES configuration #######
            self.port = cfg.get('Elasticsearch', 'port')
            self.hostnames = cfg.get('Elasticsearch', 'hostnames')
            self.username = cfg.get('Elasticsearch', 'username')
            self.password = cfg.get('Elasticsearch', 'password')
            self.index = cfg.get('Elasticsearch', 'index')
            self.delay = cfg.get('Elasticsearch', 'delay')
            self.folder = cfg.get('Elasticsearch', 'root_csv_folder')
            self.filename = cfg.get('Elasticsearch', 'csv_filename')

            self.ssl_es_context = create_ssl_context()
            self.ssl_es_context.check_hostname = False
            self.ssl_es_context.verify_mode = ssl.CERT_NONE
        

            try:
                self.start_time_conf = cfg.get('Elasticsearch', 'start_time')
            except:
                self.start_time_conf = None
            try:
                self.end_time_conf = cfg.get('Elasticsearch', 'end_time')
            except:
                self.end_time_conf = None

        except Exception as e:
            logger.error('ElasticsearchFinder.__init__ : initiate parameters failed, please verify fetch_qradar.conf : %s', str(e))

        now = datetime.datetime.now()

        if self.delay != "0":
            start_date = now - datetime.timedelta(hours=int(self.delay))
        else:
            start_date = now - datetime.timedelta(weeks=52)

        self.end_time = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
        self.start_time = start_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        if self.start_time_conf:
            self.start_time = datetime.datetime.strptime(self.start_time_conf, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        if self.end_time_conf:
            self.end_time = datetime.datetime.strptime(self.end_time_conf, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        try:
            self.es = Elasticsearch(self.hostnames, port = self.port, scheme="https",
                                    http_auth=(self.username, self.password), ssl_context=self.ssl_es_context)

        except Exception as e:
            logger.error('ElasticsearchFinder.__init__ : Connect to remote Elasticsearch %s:%s failed : %s', self.hostname, self.port, str(e))


    def search_elastic(self, start_time=None, end_time=None):
        try:
            # field to store all data
            data = []

            # parameters end_time and start_time as ARGS
            if start_time:
                self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")

            if end_time:
                self.end_time = datetime.datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%dT%H:%M:%S.%fZ")

            logger.info('ElasticsearchFinder.search_elastic : QUERY = field1: "value1" and (field2: "value2" or "value3")')
            query = {
                        "query": {
                            "bool": {
                                "must": [
                                    {"match" : {"field1" : "value1"}},
                                    {"range": {"timestamp": {"lt": self.end_time, "gte": self.start_time}}},
                                    {
                                        "bool": {
                                            "should": [
                                                {"match" : {"field2" : "value2"}},
                                                {"match" : {"field2" : "value3"}}
                                            ]
                                        }
                                    }
                                ]
                            }
                        }
                    }
            
            # Initialize the scroll
            page = self.es.search(
                index=self.index,
                scroll='30m',
                size=1000,
                body=query)

            sid = page['_scroll_id']
            scroll_size = page['hits']['total']['value']
            logger.info('ElasticsearchFinder.search_elastic : search for qradar from {} to {}. Total hits {}'.format(self.start_time, self.end_time, scroll_size))

            # fetch data
            for i in page['hits']['hits']:
                data.append({'some_field' : i[_source]['some_field']
                    })

            # Start scrolling
            while (scroll_size > 0):
                # Get the number of results that we returned in the last scroll
                page = self.es.scroll(scroll_id=sid, scroll='30m')
                scroll_size = len(page['hits']['hits'])
                for i in page['hits']['hits']:
                    data.append({'some_field' : i[_source]['some_field']
                    })

                # Update the scroll ID (to move to next page)
                sid = page['_scroll_id']

            logger.info('ElasticsearchFinder.search_elastic : Total stored data {}'.format(len(data)))
           
            # write CSV file
            self.writeToCSV(self.folder+self.filename, data)

该查询已直接在 Kibana 上进行检查并且是正确的。 (我的脚本也没有返回与 Kibana 相同数量的结果)

我必须设置较大的超时时间,否则我会收到超时错误。

知道为什么每次都没有返回相同数量的结果吗? 我想scroll_id 可能被保存在某个地方,所以 Elastic 没有返回它在上一次运行中已经返回的结果,但 scroll_id 从一次运行更改为下一次运行,所以这似乎不太可能。

【问题讨论】:

  • 您好,请问您的 elasticsearch 集群如何?你有单个节点还是弹性搜索集群?如果你有集群,我可以知道你机器的配置吗?
  • @saeednasehi 这是一个集群,但我不知道配置,我只知道主机名
  • 在我看来,这个问题是因为你的集群。我认为您的集群的一个或多个节点存在一些问题,例如网络延迟或硬盘慢。我不确定这一点,但您的集群的这种行为显示了围绕这个问题的一些东西。
  • @saeednasehi 集群已被同事检查,似乎没有任何问题,我将尝试在脚本中使用超时参数

标签: python elasticsearch


【解决方案1】:

原来脚本创建了太多滚动上下文(超过默认的 500 个)。这就解释了为什么如果脚本连续运行多次时会特别出现问题。

这可以通过查看结果的_shards 部分(在我的例子中为page['_shards'])来看到,其中包含许多这样的错误消息:

Trying to create too many scroll contexts. Must be less than or equal to: [500]

减少es.search() 中的scroll 参数似乎会有所帮助,因为滚动上下文会更快地刷新,以及在使用es.clear_scroll() 搜索后清除滚动上下文。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-02-21
    • 1970-01-01
    • 1970-01-01
    • 2020-01-28
    • 2021-12-20
    • 2023-01-22
    • 1970-01-01
    相关资源
    最近更新 更多