【问题标题】:elasticsearch ingest pipeline to rename dynamic fieldselasticsearch摄取管道重命名动态字段
【发布时间】:2020-05-06 01:40:26
【问题描述】:

我有一个相当大的文档,我正在将其引入 elasticsearch(70-80 个属性)。并且可能会定期出现新的领域。我可以使用重命名运算符有效地重命名每个字段,但我想知道是否有更有效的方法可以使用脚本动态重命名每个字段。

每个字段的当前格式可以是全部小写,下划线分隔,也可以是大小写组合,下划线分隔。我需要将每个字段重命名如下 -

(例如源字段)TV_field_1 -->(需要重命名为)tvField1

(例如源字段)feature_title_no -->(需要重命名为)featureTitleNo

谢谢

【问题讨论】:

    标签: elasticsearch elasticsearch-painless data-ingestion


    【解决方案1】:

    由于重命名处理器不支持替换模式,编写了自定义脚本

    PUT _ingest/pipeline/snakeCaseToCamelCase
    {
      "description": "Convert snake case to camel case",
      "processors": [
        {
          "script": {
            "source": """
                // Iterate through all keys and create list of fields with snake case
                def loopAllFields(def x){
                  def ret=[];
                  if(x instanceof Map){
                    for (entry in x.entrySet()) {
                      if (entry.getKey().indexOf("_")==0) { 
                        continue;
                      }
                      // Get value
                      def val=entry.getValue();
                      if(entry.getKey().indexOf("_")>-1)
                      {
                          ret.add(entry.getKey());
                      }
                      // If further object
                      if(val instanceof Map|| val instanceof HashMap)
                      {
                        def list =loopAllFields(val);
                        for(item in list)
                        {
                          ret.add(entry.getKey()+"."+ item);
                        }
                      }
                      // If array
                      if(val instanceof ArrayList)
                      {
                        for(v in val)
                        {
                           def list =loopAllFields(v);
                           for(item in list)
                           {
                              ret.add(entry.getKey()+"."+ item);
                           }
                        }
                      }
                    }
                  }
                  return ret;
                }
    
                // Create a camel case field and delete snake case field
                def renameField(def ctx, def fieldName)
                {
                   def str=splitText(fieldName,'.');
                   if(str.length<=1)
                   {
                      def newField=snakeToCamel(fieldName);
                      ctx[newField]=ctx[fieldName];
                      ctx.remove(fieldName);
                   }
                   else
                   {
                       if(ctx[str[0]] instanceof ArrayList)
                       {
                           def fld=combineArray(str);
                           for(v in ctx[str[0]])
                           {
                              renameField(v,fld);
                           }
                       }
                       if(ctx[str[0]] instanceof Map)
                       {
                          def fld=combineArray(str);
                          renameField(ctx[str[0]],fld); 
                        }
                   }
                   return 1;
                }
    
                def combineArray(def str)
                {
                  def fld="";
                  for(int i=1;i<str.length;i++)
                  {
                    if(fld=="")
                    {
                       fld+=str[i];
                    }
                    else
                    {
                       fld+="."+str[i];
                    }
                  }
                  return fld;
                }
    
                // Convert field name from snake case to camel case
                def snakeToCamel(def s){
                  //def str=/_/.split(s);
                  def str=splitText(s,'_');
                  def fieldName="";
                  for(int i=0;i<str.length;i++)
                  {
                     if(i==0)
                     {
                       fieldName+=str[i].toLowerCase();
                     }
                     else{
                        if(str[i].length()==1)
                        {
                          fieldName+=str[i].substring(0,1).toUpperCase();
                        }else{
                          fieldName+=str[i].substring(0,1).toUpperCase()+str[i].substring(1,str[i].length());
                        }
                     }
                  }
                  return fieldName;
                }
    
                def splitText(def str, def seperator)
                {
                  def pos= str.indexOf(seperator);
                  def ret=[];
                  while(pos>0)
                  {
                     def split=str.substring(0,pos);
                     def rest= str.substring(pos+1,str.length());
                     ret.add(split);
                     pos=rest.indexOf(seperator);
                     if(pos==-1)
                     {
                       ret.add(rest);
                     }
                     str=rest;
                  }
                  return ret;
                }
    
                def fields=loopAllFields(ctx);
                fields.sort((s1, s2) -> s2.length() - s1.length());
                for(field in fields)
                {
                  renameField(ctx,field);
                }
    
    """
          }
        }
      ]
    }
    

    【讨论】:

    • 谢谢@jaspreet。我在这里遇到了一个问题——因为我的 ES 域在 AWS 上运行,我无权访问 elasticsearch.yml 文件,因此脚本的正则表达式被禁用。由于此行中使用了正则表达式def str=/\\./.split(fieldName),这会导致上述脚本的编译错误
    • @raja def splitText(def str) { def pos= str.indexOf('.'); def ret=[]; while(pos>0) { def split=str.substring(0,pos); def rest= str.substring(pos+1,str.length()); ret.add(拆分); pos=rest.indexOf('.'); if(pos==-1) { ret.add(rest); } } 返回 ret; } 调用而不是正则表达式
    • 感谢@jaspreet,这样做并编译了脚本,但在运行时出现以下错误 - "ScriptException[runtime error]; nested: PainlessError[The maximum number of statements that can be executed in a loop has been reached.];
    • "script_stack": [ "while(pos&gt;0) \n { \n def ", "^---- HERE" ],
    • 在原脚本中以下地方替换两条语句,调用上述splitText函数def renameField(def ctx, def fieldName) { def str=splitText(fieldName); ....def snakeToCamel(def s){ def str=splitText(s); ....
    猜你喜欢
    • 2018-08-09
    • 2021-09-14
    • 2018-04-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-07
    相关资源
    最近更新 更多