【发布时间】:2014-11-12 12:19:24
【问题描述】:
我的目标是重新索引具有 1000 万个分片的索引,以便更改字段映射以促进重要的术语分析。
我的问题是我在使用 NEST 库执行重新索引时遇到问题,并且文档(非常)有限。如果可能的话,我需要一个正在使用的示例:
【问题讨论】:
-
现在可以通过 NEST 使用服务器端重新索引。
标签: elasticsearch nest
我的目标是重新索引具有 1000 万个分片的索引,以便更改字段映射以促进重要的术语分析。
我的问题是我在使用 NEST 库执行重新索引时遇到问题,并且文档(非常)有限。如果可能的话,我需要一个正在使用的示例:
【问题讨论】:
标签: elasticsearch nest
NEST 提供了一个不错的 Reindex 方法,您可以使用,尽管缺少文档。我已经通过这个特别的 WinForms 代码以非常粗略的方式使用它。
private ElasticClient client;
private double count;
private void reindex_Completed()
{
MessageBox.Show("Done!");
}
private void reindex_Next(IReindexResponse<object> obj)
{
count += obj.BulkResponse.Items.Count();
var progress = 100 * count / (double)obj.SearchResponse.Total;
progressBar1.Value = (int)progress;
}
private void reindex_Error(Exception ex)
{
MessageBox.Show(ex.ToString());
}
private void button1_Click(object sender, EventArgs e)
{
count = 0;
var reindex = client.Reindex<object>(r => r.FromIndex(fromIndex.Text).NewIndexName(toIndex.Text).Scroll("10s"));
var o = new ReindexObserver<object>(onError: reindex_Error, onNext: reindex_Next, completed: reindex_Completed);
reindex.Subscribe(o);
}
我刚刚找到了一篇告诉我如何操作的博文:http://thomasardal.com/elasticsearch-migrations-with-c-and-nest/
【讨论】:
不幸的是,NEST 的实现并不完全符合我的预期。在我看来,对于可能最常见的用例来说,它有点过度设计。
很多人只想以零停机时间更新他们的映射...
在我的例子中 - 我已经用它的所有设置和映射创建了索引,但是NEST 坚持它必须在重新索引时创建一个新索引。这在许多其他事情中。其他的东西太多了。
我发现直接实现要简单得多——因为 NEST 已经有 Search、Scroll 和 Bulk 方法。 (这是从NEST的实现中采用的):
// Assuming you have already created and setup the index yourself
public void Reindex(ElasticClient client, string aliasName, string currentIndexName, string nextIndexName)
{
Console.WriteLine("Reindexing documents to new index...");
var searchResult = client.Search<object>(s => s.Index(currentIndexName).AllTypes().From(0).Size(100).Query(q => q.MatchAll()).SearchType(SearchType.Scan).Scroll("2m"));
if (searchResult.Total <= 0)
{
Console.WriteLine("Existing index has no documents, nothing to reindex.");
}
else
{
var page = 0;
IBulkResponse bulkResponse = null;
do
{
var result = searchResult;
searchResult = client.Scroll<object>(s => s.Scroll("2m").ScrollId(result.ScrollId));
if (searchResult.Documents != null && searchResult.Documents.Any())
{
searchResult.ThrowOnError("reindex scroll " + page);
bulkResponse = client.Bulk(b =>
{
foreach (var hit in searchResult.Hits)
{
b.Index<object>(bi => bi.Document(hit.Source).Type(hit.Type).Index(nextIndexName).Id(hit.Id));
}
return b;
}).ThrowOnError("reindex page " + page);
Console.WriteLine("Reindexing progress: " + (page + 1) * 100);
}
++page;
}
while (searchResult.IsValid && bulkResponse != null && bulkResponse.IsValid && searchResult.Documents != null && searchResult.Documents.Any());
Console.WriteLine("Reindexing complete!");
}
Console.WriteLine("Updating alias to point to new index...");
client.Alias(a => a
.Add(aa => aa.Alias(aliasName).Index(nextIndexName))
.Remove(aa => aa.Alias(aliasName).Index(currentIndexName)));
// TODO: Don't forget to delete the old index if you want
}
还有ThrowOnError 扩展方法,以备不时之需:
public static T ThrowOnError<T>(this T response, string actionDescription = null) where T : IResponse
{
if (!response.IsValid)
{
throw new CustomExceptionOfYourChoice(actionDescription == null ? string.Empty : "Failed to " + actionDescription + ": " + response.ServerError.Error);
}
return response;
}
【讨论】:
我支持 Ben Wilde 的上述回答。最好完全控制索引创建和重新索引过程。
Ben 的代码中缺少对父/子关系的支持。这是我的代码来解决这个问题:
替换以下行:
foreach (var hit in searchResult.Hits)
{
b.Index<object>(bi => bi.Document(hit.Source).Type(hit.Type).Index(nextIndexName).Id(hit.Id));
}
有了这个:
foreach (var hit in searchResult.Hits)
{
var jo = hit.Source as JObject;
JToken jt;
if(jo != null && jo.TryGetValue("parentId", out jt))
{
// Document is child-document => add parent reference
string parentId = (string)jt;
b.Index<object>(bi => bi.Document(hit.Source).Type(hit.Type).Index(nextIndexName).Id(hit.Id).Parent(parentId));
}
else
{
b.Index<object>(bi => bi.Document(hit.Source).Type(hit.Type).Index(nextIndexName).Id(hit.Id));
}
}
【讨论】: