【发布时间】:2022-01-18 13:55:52
【问题描述】:
使用大型索引(100,000 个文档),我有一个用例会产生多个尝试并行更新文档的线程,源代码使用两种方法来更新文档:Update 和 UpdateByQuery,所以一些线程调用Update,其中一些调用UpdateByQuery。
为简洁起见,每个线程都尝试更新整个文档的相同属性。
这是一个演示用例的小型 POC:
索引 100,000 个 Product 类型的文档,并生成 100 个任务,因此每个任务并行调用 Update 和 UpdateByQuery。他们都使用MatchAll查询。
public async Task ConflictsTestAsync()
{
// index 100,000 documents.
IEnumerable<Product> products = CreateProducts(100000);
await _client.IndexManyAsync(products);
await _client.Indices.UpdateSettingsAsync("MyIndex", s => s
.IndexSettings(i => i.Setting(UpdatableIndexSettings.MaxResultWindow, 100000)));
var searchResponse = _client.Search<Product>(s => s
.From(0)
.Size(100000)
.Query(q => q.MatchAll())
);
IReadOnlyCollection<IHit<Product>> getProducts = searchResponse.Hits;
List<Task> tasks = new List<Task>();
for (int i = 0; i < 100; i++)
{
tasks.Add(UpdateByQuery(getProducts));
tasks.Add(Update(getProducts));
}
await Task.WhenAll(tasks);
}
public class Product
{
public string Price { get; set; }
public string Id { get; set; }
}
UpdateByQuery(将价格更新为 0):
private async Task UpdateByQuery()
{
Func<UpdateByQueryDescriptor<Product>, IUpdateByQueryRequest> updateByQuerySelector = (UpdateByQueryDescriptor<Product> updateByQueryDescriptor) =>
{
updateByQueryDescriptor
.Conflicts(Conflicts.Abort)
.ErrorTrace()
.Query(x => x.MatchAll())
.Script(x => x.Source("ctx._source['price'] = '0'"));
IUpdateByQueryRequest result = updateByQueryDescriptor;
return result;
};
await _client.UpdateByQueryAsync(updateByQuerySelector, CancellationToken.None);
}
更新(更新价格为1):
private async Task Update(IReadOnlyCollection<IHit<Product>> keys)
{
foreach (IHit<Product> product in keys)
{
DocumentPath<Product> id = new DocumentPath<Product>(product.Id);
Func<UpdateDescriptor<Product, Product>, IUpdateRequest<Product, Product>> updateSelector = (UpdateDescriptor<Product, Product> updateDescriptor) =>
{
var page = product.Source;
page.Price = "1";
updateDescriptor.Doc(page);
IUpdateRequest<Product, Product> result = updateDescriptor;
return result;
};
await _client.UpdateAsync<Product>(id, updateSelector);
}
}
问题是产生多个 Query 线程和 UpdateByQuery 导致冲突异常:
一些Update 抛出:
从不成功的 (409) 低级别调用构建的无效 NEST 响应 在 POST 上:/MyIndex/_update/d75a34ae-2533-4e15-a852-13e98c5b599c 此 API 调用的审计跟踪:
- [1] 错误响应:节点:http://localhost:9200/ 接受:00:00:00.2495465 OriginalException:Elasticsearch.Net.ElasticsearchClientException:请求执行失败。呼叫:状态码 409 来自:POST /MyIndex/_update/d75a34ae-2533-4e15-a852-13e98c5b599c。 ServerError:类型:version_conflict_engine_exception 原因: "[d75a34ae-2533-4e15-a852-13e98c5b599c]:版本冲突,必填 seqNo [701069],主要术语 [1]。当前文档有 seqNo [702042] 和主要术语 [1]" 请求:{"doc":{"id":"d75a34ae-2533-4e15-a852-13e98c5b599c","manufacturer":"777e1602-8390-40c8-817e-fdef4e3fb9c0","price":"1","title ":"31184e90-f1d1-45be-8746-496a50de2f97","描述":"780cc1ab-0a8b-4114-a840-67a528de8e55"}} 响应:{"error":{"root_cause":[{"type":"version_conflict_engine_exception","reason":"[d75a34ae-2533-4e15-a852-13e98c5b599c]: 版本冲突,需要 seqNo [701069],主要术语 [1]。当前的 文档有 seqNo [702042] 和主要术语 [1]","index_uuid":"gFAfYHfNQZG0tD0L2a9yag","shard":"0","index":"MyIndex"}],"type":"version_conflict_engine_exception","reason":"[d75a34ae-2533-4e15 -a852-13e98c5b599c]: 版本冲突,需要 seqNo [701069],主要术语 [1]。当前的 文档有 seqNo [702042] 和主要术语 [1]","index_uuid":"gFAfYHfNQZG0tD0L2a9yag","shard":"0","index":"MyIndex"},"status":409}
还有一些UpdateByQuery 抛出(我已经清理了失败数组):
从不成功的 (409) 低级别调用构建的无效 NEST 响应 在 POST 上: /MyIndex/_update_by_query?conflicts=abort&error_trace=true 此 API 调用的审计跟踪:
- [1] 错误响应:节点:http://localhost:9200/ 接受:00:00:00.2636546 OriginalException:Elasticsearch.Net.ElasticsearchClientException:请求执行失败。呼叫:状态码 409 来自:POST /MyIndex/_update_by_query?conflicts=abort&error_trace=true 请求:{"query":{"match_all":{}},"script":{"source":"ctx._source['price'] = '0'"}} 响应:{"took":120,"timed_out":false,"total":100000,"updated":0,"deleted":0,"batches":1,"version_conflicts":1000,"noops":0 ,"重试次数":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[{"index":"MyIndex ","type":"_doc","id":"d5fb4183-4ff4-43c9-962c-ee9d0ee59a6b","cause":{"type":"version_conflict_engine_exception","reason":"[d5fb4183-4ff4-43c9 -962c-ee9d0ee59a6b]: 版本冲突,需要seqNo [0],主要术语[1]。当前的 文档有 seqNo [100000] 和主要术语 [1]","index_uuid":"pPDFKhj6T4y-MpzYECKpxQ","shard":"0"]}
因为性能很重要,我不希望放弃 UpdateByQuery(并且仅使用 Update)或对方法访问进行互斥,任何处理这种情况的建议都将不胜感激。
更新:
数据完整性很重要:.Conflicts(Conflicts.Abort)
弹性搜索:7.10.0。
【问题讨论】:
-
使用
seq_no和primary_term进行乐观并发控制:elastic.co/guide/en/elasticsearch/reference/7.16/…
标签: elasticsearch nest