【问题标题】:NEST Elasticsearch Reindex examplesNEST Elasticsearch 重新索引示例
【发布时间】:2014-11-12 12:19:24
【问题描述】:

我的目标是重新索引具有 1000 万个分片的索引,以便更改字段映射以促进重要的术语分析。

我的问题是我在使用 NEST 库执行重新索引时遇到问题,并且文档(非常)有限。如果可能的话,我需要一个正在使用的示例:

http://nest.azurewebsites.net/nest/search/scroll.html

http://nest.azurewebsites.net/nest/core/bulk.html

【问题讨论】:

  • 现在可以通过 NEST 使用服务器端重新索引。

标签: elasticsearch nest


【解决方案1】:

NEST 提供了一个不错的 Reindex 方法,您可以使用,尽管缺少文档。我已经通过这个特别的 WinForms 代码以非常粗略的方式使用它。

    private ElasticClient client;
    private double count;

    private void reindex_Completed()
    {
        MessageBox.Show("Done!");
    }

    private void reindex_Next(IReindexResponse<object> obj)
    {
        count += obj.BulkResponse.Items.Count();
        var progress = 100 * count / (double)obj.SearchResponse.Total;
        progressBar1.Value = (int)progress;
    }

    private void reindex_Error(Exception ex)
    {
        MessageBox.Show(ex.ToString());
    }

    private void button1_Click(object sender, EventArgs e)
    {
        count = 0;

        var reindex = client.Reindex<object>(r => r.FromIndex(fromIndex.Text).NewIndexName(toIndex.Text).Scroll("10s"));

        var o = new ReindexObserver<object>(onError: reindex_Error, onNext: reindex_Next, completed: reindex_Completed);
        reindex.Subscribe(o);
    }

我刚刚找到了一篇告诉我如何操作的博文:http://thomasardal.com/elasticsearch-migrations-with-c-and-nest/

【讨论】:

  • 非常感谢,我看看我能用它做什么!
  • 对我来说重要的是调用 reindex.Subscribe() 并观察抛出的异常。这对我来说失败了,原因有两个:1)索引已经存在,2)当前索引不包含任何文档。
  • 如果我不在 reindex() 调用中创建新索引,则会引发 ReindexException,为什么会这样?
【解决方案2】:

不幸的是,NEST 的实现并不完全符合我的预期。在我看来,对于可能最常见的用例来说,它有点过度设计。

很多人只想以零停机时间更新他们的映射...

在我的例子中 - 我已经用它的所有设置和映射创建了索引,但是NEST 坚持它必须在重新索引时创建一个新索引。这在许多其他事情中。其他的东西太多了。

我发现直接实现要简单得多——因为 NEST 已经有 SearchScrollBulk 方法。 (这是从NEST的实现中采用的):

// Assuming you have already created and setup the index yourself
public void Reindex(ElasticClient client, string aliasName, string currentIndexName, string nextIndexName)
{
    Console.WriteLine("Reindexing documents to new index...");
    var searchResult = client.Search<object>(s => s.Index(currentIndexName).AllTypes().From(0).Size(100).Query(q => q.MatchAll()).SearchType(SearchType.Scan).Scroll("2m"));
    if (searchResult.Total <= 0)
    {
        Console.WriteLine("Existing index has no documents, nothing to reindex.");
    }
    else
    {
        var page = 0;
        IBulkResponse bulkResponse = null;
        do
        {
            var result = searchResult;
            searchResult = client.Scroll<object>(s => s.Scroll("2m").ScrollId(result.ScrollId));
            if (searchResult.Documents != null && searchResult.Documents.Any())
            {
                searchResult.ThrowOnError("reindex scroll " + page);
                bulkResponse = client.Bulk(b =>
                {
                    foreach (var hit in searchResult.Hits)
                    {
                        b.Index<object>(bi => bi.Document(hit.Source).Type(hit.Type).Index(nextIndexName).Id(hit.Id));
                    }

                    return b;
                }).ThrowOnError("reindex page " + page);
                Console.WriteLine("Reindexing progress: " + (page + 1) * 100);
            }

            ++page;
        }
        while (searchResult.IsValid && bulkResponse != null && bulkResponse.IsValid && searchResult.Documents != null && searchResult.Documents.Any());
        Console.WriteLine("Reindexing complete!");
    }

    Console.WriteLine("Updating alias to point to new index...");
    client.Alias(a => a
        .Add(aa => aa.Alias(aliasName).Index(nextIndexName))
        .Remove(aa => aa.Alias(aliasName).Index(currentIndexName)));

    // TODO: Don't forget to delete the old index if you want
}

还有ThrowOnError 扩展方法,以备不时之需:

public static T ThrowOnError<T>(this T response, string actionDescription = null) where T : IResponse
{
    if (!response.IsValid)
    {
        throw new CustomExceptionOfYourChoice(actionDescription == null ? string.Empty : "Failed to " + actionDescription + ": " + response.ServerError.Error);
    }

    return response;
}

【讨论】:

    【解决方案3】:

    我支持 Ben Wilde 的上述回答。最好完全控制索引创建和重新索引过程。

    Ben 的代码中缺少对父/子关系的支持。这是我的代码来解决这个问题:

    替换以下行:

    foreach (var hit in searchResult.Hits)
    {
        b.Index<object>(bi => bi.Document(hit.Source).Type(hit.Type).Index(nextIndexName).Id(hit.Id));
    }
    

    有了这个:

    foreach (var hit in searchResult.Hits)
    {
        var jo = hit.Source as JObject;
        JToken jt;
        if(jo != null && jo.TryGetValue("parentId", out jt))
        {
            // Document is child-document => add parent reference
            string parentId = (string)jt;
            b.Index<object>(bi => bi.Document(hit.Source).Type(hit.Type).Index(nextIndexName).Id(hit.Id).Parent(parentId));
        }
        else
        {
            b.Index<object>(bi => bi.Document(hit.Source).Type(hit.Type).Index(nextIndexName).Id(hit.Id));
        }                                
    }
    

    【讨论】:

      猜你喜欢
      • 2017-12-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-07-30
      • 2021-08-12
      • 1970-01-01
      • 2016-06-17
      相关资源
      最近更新 更多