【发布时间】:2018-07-14 06:31:48
【问题描述】:
这个问题更多的是理论而不是源代码。
我有一个 ES 2.x 节点,它有超过 1.2TB 的数据。我们有 40 多个索引,每个索引至少有 1 种类型。在这里,ES 2.x 被用作数据库而不是搜索引擎。用于将数据转储到 ES 2.x 的源丢失了。此外,数据未标准化,但单个 ES 文档具有多个嵌入文档。我们的目标是重新创建数据源并同时对其进行规范化。
我们的计划是:
- 从 ES 中获取数据,分析并转储到新的 mongodb 到特定的集合,并维护数据之间的关系。 IE。以标准化形式保存。
- 在新的 ES 6 节点上为新的 mongo 数据编制索引。
我们正在使用 JRuby 9.1.15.0、Rails 5、Ruby 2.4 和 Sidekiq。
目前,我们正在从 ES 检索特定日期时间范围的数据。有时我们会收到 0 条记录,有时会收到 100000+ 条记录。问题是当我们收到大量记录时。
下面是一个示例脚本,当日期范围的数据较小时有效,但在数据较大时失败。 1.2TB/40 个索引是平均索引大小。
class DataRetrieverWorker
include Sidekiq::Worker
include Sidekiq::Status::Worker
def perform(indx_name, interval = 24, start_time = nil, end_time = nil)
unless start_time || end_time
client = ElasticSearchClient.instance.client
last_retrieved_at = RetrievedIndex.where(name: indx_name).desc(:created_at).first
start_time, end_time = unless last_retrieved_at
data = client.search index: indx_name, size: 1, sort: [{ insert_time: { order: 'asc' } }]
first_day = DateTime.parse(data['hits']['hits'].first['_source']['insert_time'])
start_time = first_day.beginning_of_day
end_time = first_day.end_of_day
else
# retrieve for the next time slot. usually 24 hrs.
[last_retrieved_at.end_time, last_retrieved_at.end_time + interval.hours]
end
DataRetrieverWorker.perform_async(indx_name, interval, start_time, end_time)
else
# start scroll on the specified range and retrieve data.
query = { range: { insert_time: { gt: DateTime.parse(start_time).utc.iso8601, lt: DateTime.parse(end_time).utc.iso8601 } } }
data = client.search index: indx_name, scroll: '10m', size: SCROLL_SIZE, body: { query: query }
ri = RetrievedIndex.find_by(name: indx_name, start_time: start_time, end_time: end_time)
if ri
DataRetrieverWorker.perform_at(2.seconds.from_now, indx_name, interval)
return
end
ri = RetrievedIndex.create!(name: indx_name, start_time: start_time, end_time: end_time, documents_cnt: data['hits']['total'])
if data['hits']['total'] > 0
if data['hits']['total'] > 2000
BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
end
else
data['hits']['hits'].each do |r|
schedule(r)
ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
end
while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
data['hits']['hits'].each do |r|
schedule(r)
ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
end
end
end
else
DataRetrieverWorker.perform_async(indx_name, interval)
return
end
DataRetrieverWorker.perform_at(indx_name, interval)
end
end
private
def schedule(data)
DataPersisterWorker.perform_async(data)
end
end
问题:
- 从 ES 2.x 中检索数据的理想方法应该是什么。我们通过日期范围检索数据,然后使用滚动 API 检索结果集。对吗?
- 当我们在特定时间范围内获得较大结果时应该做什么。有时,我们会在几分钟的时间范围内获得 20000 多条记录。理想的方法应该是什么?
- sidekiq 是处理这种数据量的合适库吗?
- 运行 sidekiq 的服务器的理想配置应该是什么?
- 使用日期范围是检索数据的正确方法吗?文件的数量变化很大。 0 或 100000+。
- 是否有任何更好的方法可以让我知道无论时间范围如何的记录数量?
- 我尝试独立于时间范围使用滚动 api,但是对于具有 100cr 记录的索引,使用大小为 100 的滚动是否正确(对 ES 的 api 调用有 100 个结果)? 8.指数数据不断增加。所有文档均未更新。
我们已经测试了我们的代码,它可以处理每个日期时间范围(例如 6 小时)的名义数据(例如 4-5k 个文档)。我们还计划对数据进行分片。由于每当我们在某些集合中添加/更新记录时,我们都需要执行一些 ruby 回调,因此我们将使用 Mongoid。在没有 mongoid 的情况下直接在 mongodb 中插入数据不是一种选择。
任何指针都会有所帮助。谢谢。
【问题讨论】:
-
我想知道您是否可以简单地开始阅读按 _doc 排序的滚动请求(无论大小)。您的 Elastic 设置应该能够轻松应对。通过这种方式,您可以覆盖所有创建的文档(以一致的批次),直到您开始滚动为止。你想如何处理你的新文件,是另一个问题。也许您可以实现新的弹性索引逻辑并将新文档标记为
"normalized": true或类似的东西?如果您想避免任何停机时间,您可能需要更改应用程序/索引逻辑(例如移动到新索引)。
标签: ruby elasticsearch sidekiq