【问题标题】:dump elasticsearch 2.x to mongodb and back to ES 6.x将 elasticsearch 2.x 转储到 mongodb 并返回到 ES 6.x
【发布时间】:2018-07-14 06:31:48
【问题描述】:

这个问题更多的是理论而不是源代码。

我有一个 ES 2.x 节点,它有超过 1.2TB 的数据。我们有 40 多个索引,每个索引至少有 1 种类型。在这里,ES 2.x 被用作数据库而不是搜索引擎。用于将数据转储到 ES 2.x 的源丢失了。此外,数据未标准化,但单个 ES 文档具有多个嵌入文档。我们的目标是重新创建数据源并同时对其进行规范化。

我们的计划是:

  1. 从 ES 中获取数据,分析并转储到新的 mongodb 到特定的集合,并维护数据之间的关系。 IE。以标准化形式保存。
  2. 在新的 ES 6 节点上为新的 mongo 数据编制索引。

我们正在使用 JRuby 9.1.15.0、Rails 5、Ruby 2.4 和 Sidekiq。

目前,我们正在从 ES 检索特定日期时间范围的数据。有时我们会收到 0 条记录,有时会收到 100000+ 条记录。问题是当我们收到大量记录时。

下面是一个示例脚本,当日期范围的数据较小时有效,但在数据较大时失败。 1.2TB/40 个索引是平均索引大小

class DataRetrieverWorker
  include Sidekiq::Worker
  include Sidekiq::Status::Worker

  def perform(indx_name, interval = 24, start_time = nil, end_time = nil)
    unless start_time || end_time
      client = ElasticSearchClient.instance.client
      last_retrieved_at = RetrievedIndex.where(name: indx_name).desc(:created_at).first
      start_time, end_time = unless last_retrieved_at
                               data = client.search index: indx_name, size: 1, sort: [{ insert_time: { order: 'asc' } }]
                               first_day = DateTime.parse(data['hits']['hits'].first['_source']['insert_time'])
                               start_time = first_day.beginning_of_day
                               end_time = first_day.end_of_day
                             else
                               # retrieve for the next time slot. usually 24 hrs.
                               [last_retrieved_at.end_time, last_retrieved_at.end_time + interval.hours]
                             end
      DataRetrieverWorker.perform_async(indx_name, interval, start_time, end_time)
    else
       # start scroll on the specified range and retrieve data.
       query = { range: { insert_time: { gt: DateTime.parse(start_time).utc.iso8601, lt: DateTime.parse(end_time).utc.iso8601 } } }
       data = client.search index: indx_name, scroll: '10m', size: SCROLL_SIZE, body: { query: query }
      ri = RetrievedIndex.find_by(name: indx_name, start_time: start_time, end_time: end_time)
      if ri
        DataRetrieverWorker.perform_at(2.seconds.from_now, indx_name, interval)
        return
      end
      ri = RetrievedIndex.create!(name: indx_name, start_time: start_time, end_time: end_time, documents_cnt: data['hits']['total'])
      if data['hits']['total'] > 0
        if data['hits']['total'] > 2000
          BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
          while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
            BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
          end
        else
          data['hits']['hits'].each do |r|
            schedule(r)
            ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
          end
          while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
            data['hits']['hits'].each do |r|
              schedule(r)
              ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
            end
          end
        end
      else
        DataRetrieverWorker.perform_async(indx_name, interval)
        return
      end
      DataRetrieverWorker.perform_at(indx_name, interval)
    end
  end

  private

  def schedule(data)
    DataPersisterWorker.perform_async(data)
  end
end

问题:

  1. 从 ES 2.x 中检索数据的理想方法应该是什么。我们通过日期范围检索数据,然后使用滚动 API 检索结果集。对吗?
  2. 当我们在特定时间范围内获得较大结果时应该做什么。有时,我们会在几分钟的时间范围内获得 20000 多条记录。理想的方法应该是什么?
  3. sidekiq 是处理这种数据量的合适库吗?
  4. 运行 sidekiq 的服务器的理想配置应该是什么?
  5. 使用日期范围是检索数据的正确方法吗?文件的数量变化很大。 0 或 100000+。
  6. 是否有任何更好的方法可以让我知道无论时间范围如何的记录数量?
  7. 我尝试独立于时间范围使用滚动 api,但是对于具有 100cr 记录的索引,使用大小为 100 的滚动是否正确(对 ES 的 api 调用有 100 个结果)? 8.指数数据不断增加。所有文档均未更新。

我们已经测试了我们的代码,它可以处理每个日期时间范围(例如 6 小时)的名义数据(例如 4-5k 个文档)。我们还计划对数据进行分片。由于每当我们在某些集合中添加/更新记录时,我们都需要执行一些 ruby​​ 回调,因此我们将使用 Mongoid。在没有 mongoid 的情况下直接在 mongodb 中插入数据不是一种选择。

任何指针都会有所帮助。谢谢。

【问题讨论】:

  • 我想知道您是否可以简单地开始阅读按 _doc 排序的滚动请求(无论大小)。您的 Elastic 设置应该能够轻松应对。通过这种方式,您可以覆盖所有创建的文档(以一致的批次),直到您开始滚动为止。你想如何处理你的新文件,是另一个问题。也许您可以实现新的弹性索引逻辑并将新文档标记为"normalized": true 或类似的东西?如果您想避免任何停机时间,您可能需要更改应用程序/索引逻辑(例如移动到新索引)。

标签: ruby elasticsearch sidekiq


【解决方案1】:

在我看来,您应该假设该过程可能在任何阶段失败。

恕我直言,您不应该下载所有文档,而应该只下载匹配日期范围文档的 IDS。这将显着减少 ElasticSearch 返回的数据量

使用这些 IDS,您可以在后台执行另一个工作人员(我们称之为 ImporterWorker),并将 IDS 作为输入,从 ElasticSearch 下载整个文档并将它们导出到 MongoDB。

此外,如果您得到 1_000_000 个 IDS,您可以将它们分成 N 个较小的块 (200 X 5_000) 并将 N 个作业排入队列。

好处:

  • 拆分成块 - 您没有从 ElasticSearch 获得大量响应的风险,因为块大小决定了 ElasticSearch 响应的最大大小

  • 当出现故障(临时网络问题或其他任何问题)时,您将使用最初触发它的 IDS 重新运行 ImporterWorker,一切都会正常工作而不会中断整个过程。即使它失败了 - 你会知道没有导入的确切 IDS

【讨论】:

    【解决方案2】:
    1. 从 ES 2.x 中检索数据的理想方法应该是什么。我们通过日期范围检索数据,然后使用滚动 API 检索结果集。对吗?

    ES中的数据是否在不断增加?

    1. 当我们在特定时间范围内获得较大结果时应该做什么。有时,我们会在几分钟的时间范围内获得 20000 多条记录。理想的方法应该是什么?

    您正在使用滚动 api,这是一种很好的方法。你可以试试ES的Sliced Scroll API。

    1. sidekiq 是处理这种数据量的合适库吗?

    是的,sidekiq 很好,可以处理这么多数据。

    1. 运行 sidekiq 的服务器的理想配置应该是什么?

    你当前运行sidekiq的服务器的配置是什么?

    1. 使用日期范围是检索数据的正确方法吗?文件的数量变化很大。 0 或 100000+。

    您一次不能保存 100000 多个结果。您正在使用滚动 API 分块处理它们。如果数据没有继续添加到 ES 中,则使用带有滚动 api 的 match_all: {} 查询。如果不断添加数据,那么日期范围是很好的方法。

    1. 有没有更好的方法可以给我统一数量的记录,而不管时间范围如何?

    是的,如果您使用不使用日期范围。使用 scroll api 扫描从 0 到最后的所有文档。

    1. 我尝试独立于时间范围使用滚动 api,但是对于具有 100cr 记录的索引,使用大小为 100 的滚动是否正确(对 ES 的 api 调用有 100 个结果)?

    您可以增加滚动大小,因为 mongodb 支持批量插入文档。 MongoDB Bulk Insert

    以下几点可能会解决您的问题:

    在处理上一批后清除 scroll_id 可能会提高性能。

    1. 当排序顺序为 _doc 时,滚动请求进行了优化,使它们更快。如果您想迭代所有文档而不考虑顺序,这是最有效的选择。

    2. 滚动参数告诉 Elasticsearch 应该保持搜索上下文活动的时间。它的值(例如 1m)不需要足够长来处理所有数据,它只需要足够长来处理前一批结果。每个滚动请求都会设置一个新的到期时间。

    3. 超过滚动超时时会自动删除搜索上下文。然而,保持滚动条打开是有代价的(稍后将在性能部分讨论),因此一旦不再使用滚动条,就应该使用 clear-scroll API 显式清除滚动条:

    4. Scroll API:后台合并过程通过将较小的段合并在一起以创建新的更大的段来优化索引,此时较小的段被删除。此过程在滚动期间继续,但打开的搜索上下文可防止旧段在仍在使用时被删除。这就是 Elasticsearch 能够返回初始搜索请求结果的方式,而不管后续对文档的更改。 保持旧段处于活动状态意味着需要更多的文件句柄。确保已将节点配置为具有足够的空闲文件句柄,并且在获取数据后很快清除滚动 API 上下文。 我们可以使用节点统计 API 检查打开了多少搜索上下文:

    因此,清除滚动 API 上下文是非常必要的,如前面的清除滚动 API 部分所述。

    Source

    【讨论】:

    • ES 2.x 中是否提供切片滚动?此外,我们尝试对 '_doc' 使用排序,但返回的结果集不一致。
    • 我认为切片滚动在 ES 2.x 中不可用。但是,如果您将滚动与扫描结合使用,可能会对您有所帮助。
    【解决方案3】:

    基于 Elasticsearch 查询备份/恢复或重新索引数据的非常方便的工具是 Elasticdump

    要备份完整的索引,Elasticsearch 快照 API 是合适的工具。快照 API 提供了创建和恢复存储在文件或 Amazon S3 存储桶中的整个索引的快照的操作。

    让我们看一些 Elasticdump 和快照备份和恢复的示例。

    1. 使用节点包管理器安装elasticdump

      npm i elasticdump -g
      
    2. 通过查询备份到 zip 文件:

      elasticdump --input='http://username:password@localhost:9200/myindex' --searchBody '{"query" : {"range" :{"timestamp" : {"lte": 1483228800000}}}}' --output=$ --limit=1000 | gzip > /backups/myindex.gz
      
    3. 从 zip 文件恢复:

      zcat /backups/myindex.gz | elasticdump --input=$ --output=http://username:password@localhost:9200/index_name
      

    使用快照将数据备份和还原到 Amazon S3 或文件的示例

    首先配置快照目的地

    1. S3 示例

      curl 'localhost:9200/_snapshot/my_repository?pretty' 
      -XPUT -H 'Content-Type: application/json' 
      -d '{
      "type" : "s3",
      "settings" : {
         "bucket" : "test-bucket",
         "base_path" : "backup-2017-01",
         "max_restore_bytes_per_sec" : "1gb",
         "max_snapshot_bytes_per_sec" : "1gb",
         "compress" : "true",
         "access_key" : "<ACCESS_KEY_HERE>",
         "secret_key" : "<SECRET_KEY_HERE>"
         }
      }'
      
    2. 本地磁盘或挂载的 NFS 示例

      curl 'localhost:9200/_snapshot/my_repository?pretty' -XPUT -H 'Content-Type: application/json' -d '{
      "type" : "fs",
      "settings" : {
         "location": "<PATH … for example /mnt/storage/backup>"
      }
      }'
      
    3. 触发快照

      curl -XPUT 'localhost:9200/_snapshot/my_repository/<snapshot_name>'
      
    4. 显示所有备份

      curl 'localhost:9200/_snapshot/my_repository/_all'
      
    5. 还原 - 备份中最重要的部分是验证备份还原是否确实有效!

      curl -XPOST 'localhost:9200/_snapshot/my_repository/<snapshot_name>/_restore'
      

    此文本位于:
    https://sematext.com/blog/elasticsearch-security-authentication-encryption-backup/

    【讨论】:

    • 平均索引大小约为 50-60GB。其中一些是250GB +。另外,文档需要规范化(分解成多个文档,保持关联,每个保存时需要回调一些ruby方法)
    • 尺寸有什么问题?你可以做任何你想做的事,它们只是 JSON。您确实需要将其转储到磁盘而不是在内存中使用它。读取转储索引,写入 READER 并保存到 MONGODB 重写关联。我强烈推荐 PYTHON 有很多原因,其中最好的就是本地调试。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-01-04
    • 1970-01-01
    • 2019-11-02
    • 2018-08-07
    • 1970-01-01
    • 2019-11-30
    • 2018-07-08
    相关资源
    最近更新 更多