【问题标题】:Reindex part of Elasticsearch index onto new index via Jest通过 Jest 将 Elasticsearch 索引的一部分重新索引到新索引上
【发布时间】:2018-06-27 22:20:42
【问题描述】:

我有一个测试 ElasticSearch 6.0 索引,其中包含数百万条记录,生产中可能有数十亿条记录。我需要搜索这些记录的子集,然后将原始集的这个子集保存到二级索引中以供以后搜索。我已经通过在 Kibana 上查询 ES 证明了这一点,挑战是使用我的 Jest 客户端(searchbox.io,版本 5.3.3)在 Java 8 中找到合适的 API 来做同样的事情。 ElasticSearch 集群位于 AWS 上,因此无法使用传输客户端。

POST _reindex?slices=10&wait_for_completion=false
{ "conflicts": "proceed",
  "source":{
    "index": "my_source_idx",
    "size": 5000,
    "query": { "bool": {
      "filter": { "bool" : { "must" : [
        { "nested": { "path": "test", "query": { "bool": { "must":[
           { "terms" : { "test.RowKey": ["abc"]} },
           { "range" : { "test.dates" : { "lte": "2018-01-01", "gte": "2010-08-01"} } },
           { "range" : { "test.DatesCount" : { "gte": 2} } },
           { "script" : { "script" : { "id": "my_painless_script", 
              "params" : {"min_occurs" : 1, "dateField": "test.dates", "RowKey": ["abc"], "fromDate": "2010-08-01", "toDate": "2018-01-01"}}}}
        ]}}}}
      ]}}
    }}
  },
  "dest": {
    "index": "my_dest_idx"
  },
  "script": {
    "source": <My painless script>
  } }

我知道我可以在源索引上执行搜索,然后创建响应记录并将其批量加载到新索引上,但我希望能够一次性完成所有这些,因为我有一个无痛的脚本收集一些与将搜索二级索引的查询相关的信息。性能是一个问题,因为应用程序将使用目标索引将后续查询链接在一起进行查询。有谁知道如何使用 Jest 做到这一点?

【问题讨论】:

    标签: java elasticsearch elasticsearch-jest


    【解决方案1】:

    Jest 似乎尚不支持此特定功能。 Jest API 它有一种方法可以将脚本(不是查询)作为参数传递,但我什至遇到了问题。

    编辑:

    在与同事进行一些黑客攻击后,我们找到了解决此问题的方法...

    步骤 1) 扩展 GenericResultAbstractionAction 类并编辑脚本:

    public class GenericResultReindexActionHack extends GenericResultAbstractAction {
        GenericResultReindexActionHack(GenericResultReindexActionHack.Builder builder) {
            super(builder);
    
            Map<String, Object> payload = new HashMap<>();
            payload.put("source", builder.source);
        payload.put("dest", builder.dest);
        if (builder.conflicts != null) {
            payload.put("conflicts", builder.conflicts);
        }
        if (builder.size != null) {
            payload.put("size", builder.size);
        }
        if (builder.script != null) {
            Script script = (Script) builder.script;
    

    // 注意脚本参数需要不同的格式以符合ES _reindex API:

            payload.put("script", new Gson().toJson(ImmutableMap.of("id", script.getIdOrCode(), "params", script.getParams())));
        }
        this.payload = ImmutableMap.copyOf(payload);
    
        setURI(buildURI());
    }
    
    @Override
    protected String buildURI() {
        return super.buildURI() + "/_reindex";
    }
    
    @Override
    public String getRestMethodName() {
        return "POST";
    }
    
    @Override
    public String getData(Gson gson) {
        if (payload == null) {
            return null;
        } else if (payload instanceof String) {
            return (String) payload;
        } else {
    

    // 我们需要删除查询、目标和脚本字段的错误格式:

            // TODO: Need to consider spaces in the JSON
            return gson.toJson(payload).replaceAll("\\\\n", "")
                            .replace("\\", "")
                            .replace("query\":\"", "query\":")
                            .replace("\"},\"dest\"", "},\"dest\"")
                            .replaceAll("\"script\":\"","\"script\":")
                    .replaceAll("\"}","}")
                    .replaceAll("},\"script\"","\"},\"script\"");
    
        }
    }
    
    public static class Builder extends GenericResultAbstractAction.Builder<GenericResultReindexActionHack , GenericResultReindexActionHack.Builder> {
    
        private Object source;
        private Object dest;
        private String conflicts;
        private Long size;
        private Object script;
    
        public Builder(Object source, Object dest) {
            this.source = source;
            this.dest = dest;
        }
    
        public GenericResultReindexActionHack.Builder conflicts(String conflicts) {
            this.conflicts = conflicts;
            return this;
        }
    
        public GenericResultReindexActionHack.Builder size(Long size) {
            this.size = size;
            return this;
        }
    
        public GenericResultReindexActionHack.Builder script(Object script) {
            this.script = script;
            return this;
        }
    
        public GenericResultReindexActionHack.Builder waitForCompletion(boolean waitForCompletion) {
            return setParameter("wait_for_completion", waitForCompletion);
        }
    
        public GenericResultReindexActionHack.Builder waitForActiveShards(int waitForActiveShards) {
            return setParameter("wait_for_active_shards", waitForActiveShards);
        }
    
        public GenericResultReindexActionHack.Builder timeout(long timeout) {
            return setParameter("timeout", timeout);
        }
    
        public GenericResultReindexActionHack.Builder requestsPerSecond(double requestsPerSecond) {
            return setParameter("requests_per_second", requestsPerSecond);
        }
    
        public GenericResultReindexActionHack build() {
            return new GenericResultReindexActionHack(this);
        }
    }
    

    }

    第 2 步)将此类与查询一起使用需要您将查询作为源的一部分传入,然后删除“\n”字符:

    ImmutableMap<String, Object> sourceMap = ImmutableMap.of("index", sourceIndex, "query", qb.toString().replaceAll("\\\\n", ""));
            ImmutableMap<String, Object> destMap = ImmutableMap.of("index", destIndex);
    
    GenericResultReindexActionHack reindex = new GenericResultReindexActionHack.Builder(sourceMap, destMap)
                    .waitForCompletion(false)
                    .conflicts("proceed")
                    .size(5000L)
                    .script(reindexScript)
                    .setParameter("slices", 10)
                    .build();
    
            JestResult result = handleResult(reindex);
            String task = result.getJsonString();
            return (task);
    

    注意 reindexScript 参数的类型是 org.elasticsearch.script。

    这是一种绕过 Jest 限制的杂乱无章的方式,但它似乎有效。我知道这样做可能会对输入格式中可接受的内容有一些限制...

    【讨论】:

    • 当我尝试使用带有 wait_for_completion=false 的 reindex 时,JestClient 出现以下错误。我得到了任务ID。但是当我获取任务信息时,我将身份验证失败作为响应中的原因。操作 [indices:data/write/reindex] 需要身份验证。我在 JEST http 客户端配置中设置了弹性基本身份验证。但是对于这个重新索引调用,我只收到了这个错误。不确定如何调试 JEST 客户端请求标头。有谁知道如何解决这个问题?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-02
    • 1970-01-01
    • 2018-09-08
    • 1970-01-01
    相关资源
    最近更新 更多