【问题标题】:HIVE: How to sum values from key/value strings by keyHIVE:如何通过键对键/值字符串中的值求和
【发布时间】:2016-12-30 08:24:10
【问题描述】:

我有两个 string 列,其中包含 key/value 对变量。像这样的:

    column1         column2
a:1,b:2,c:3         a:5,c:3
   a:12,b:4     a:9,b:3,d:5

我怎样才能通过特定的键对这些值求和(在现实生活中我不知道我有多少键,有些键只能在一列中找到)以获得这个:

         column12
  a:6,b:2,c:6,d:0
 a:21,b:7,c:0,d:5

或者这个:

 a  b  c  d
 6  2  6  0
21  7  0  5

感谢您的帮助!

【问题讨论】:

    标签: string hive key-value


    【解决方案1】:

    假设每一行都有一个唯一的标识符“id”,下面的查询
    应该可以。

    select id, collect_list(CONCAT(key,':',val)) as column12  
    from  
    (  
      select id, key, SUM(val) as val  
      from  
      (  
        select id, k1 as key, k2 as key, v1 as val, v2 as val  
        from  
        (  
          select id, str_to_map(col1,',',':') as c1, str_to_map(col2,',',':') as c2  
          from table  
        )x  
        LATERAL VIEW explode(c1) e1 as k1,v1  
        LATERAL VIEW explode(c2) e2 as k2,v2  
      )y  
    )z  
    

    【讨论】:

      【解决方案2】:

      其中一种方法是使用 Hcatalog 编写自定义 mapreduce 代码。

      下面的 MR 获取表 1 的输入列,并将结果列写入表 2(reducer 不是必需的,因为逻辑在映射器中处理)

      import java.io.IOException;
      import java.util.*;
      
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.conf.*;
      import org.apache.hadoop.io.*;
      import org.apache.hadoop.mapreduce.*;
      import org.apache.hadoop.util.*;
      import org.apache.hcatalog.common.*;
      import org.apache.hcatalog.mapreduce.*;
      import org.apache.hcatalog.data.*;
      import org.apache.hcatalog.data.schema.*;
      import org.apache.commons.lang3.ArrayUtils;
      
      public class HCatSum extends Configured implements Tool {
      
          public static class Map
                  extends
                  Mapper<WritableComparable, HCatRecord, WritableComparable, HCatRecord> {
              String column1;
              String column2;
              String val;
      
              @Override
              protected void map(
                      WritableComparable key,
                      HCatRecord value,
                      org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord, WritableComparable, HCatRecord>.Context context)
                      throws IOException, InterruptedException {
      
                  column1 = (String) value.get(0); // a:1,b:2,c:3
                  column2 = (String) value.get(1); // a:5,c:3
      
                  String colArray[] = (String[]) ArrayUtils.addAll(
                          column1.split(","), column2.split(","));
                  HashMap<String, Integer> hs = new HashMap<String, Integer>();
                  for (String token : colArray) {
                      String tokensplit[] = token.split(":");
                      String k = tokensplit[0];
                      int v = Integer.parseInt(tokensplit[1]);
                      if (hs.containsKey(k)) {
                          int prev = hs.get(k);
                          hs.put(k, prev + v);
                      } else {
                          hs.put(k, v);                       
                      }
                  }
      
                  val = Arrays.toString(hs.entrySet().toArray()); // [a=6,b=2,c=6]
                  HCatRecord record = new DefaultHCatRecord(1);
                  record.set(0, val);
                  context.write(null, record);
              }
          }
      
          public int run(String[] args) throws Exception {
              Configuration conf = getConf();
              args = new GenericOptionsParser(conf, args).getRemainingArgs();
      
              // Get the input and output table names as arguments
              String inputTableName = args[0];
              String outputTableName = args[1];
              // Assume the default database
              String dbName = null;
      
              Job job = new Job(conf, "HCatsum");
              HCatInputFormat.setInput(job,
                      InputJobInfo.create(dbName, inputTableName, null));
              job.setJarByClass(HCatSum.class);
              job.setMapperClass(Map.class);
      
              // An HCatalog record as input
              job.setInputFormatClass(HCatInputFormat.class);
      
              // Mapper emits a string as key and an integer as value
              job.setMapOutputKeyClass(WritableComparable.class);
              job.setMapOutputValueClass(DefaultHCatRecord.class);
      
              // Ignore the key for the reducer output; emitting an HCatalog record as
              // value
      
              job.setOutputKeyClass(WritableComparable.class);
              job.setOutputValueClass(DefaultHCatRecord.class);
              job.setOutputFormatClass(HCatOutputFormat.class);
      
              HCatOutputFormat.setOutput(job,
                      OutputJobInfo.create(dbName, outputTableName, null));
              HCatSchema s = HCatOutputFormat.getTableSchema(job);
              System.err.println("INFO: output schema explicitly set for writing:"
                      + s);
              HCatOutputFormat.setSchema(job, s);
              return (job.waitForCompletion(true) ? 0 : 1);
          }
      
          public static void main(String[] args) throws Exception {
              int exitCode = ToolRunner.run(new HCatSum(), args);
              System.exit(exitCode);
          }
      }
      

      如何使用 HCatalog 运行 MR(参考链接): Hive Manual

      希望这对您有所帮助。

      【讨论】:

        【解决方案3】:

        这里有一个解决方案。这有点小技巧,但不管你有多少钥匙都可以工作。

        udf0.py

        #/usr/bin/python
        
        import sys
        from collections import Counter
        
        for line in sys.stdin:
            words = line.strip().split('\t')
            c = Counter()
        
            for word in words:
                d = {}
                s = word.split(',')
                for ss in s:
                    k,v = ss.split(':')
                    d[k] = int(v)
        
                c.update(d)
        
            print ','.join([str(k)+':'+str(v) for k,v in dict(c).iteritems()])
        

        udf1.py

        #!/usr/bin/python
        
        import sys
        
        for line in sys.stdin:
            w0, w1 = line.strip().split('\t')
        
            out = {}
            d = {}
            l = []
        
            s0 = w0.strip().split(',')
            s1 = w1.strip().split(',')
        
            for ss in s0:
                k,v = ss.split(':')
                d[k] = int(v)
        
            for ss in s1:
                l.append(ss)
        
            for keys in l:
                if d.get(keys, None) is not None:
                    out[keys] = d[keys]
                else:
                    out[keys] = 0
        
             print ','.join([str(k)+':'+str(v) for k,v in out.iteritems()])
        

        Hive 查询

        add file /home/username/udf0.py;
        add file /home/username/udf1.py;
        
        SELECT TRANSFORM(dict, unique_keys)
               USING 'python udf1.py'
               AS (final_map STRING)
        FROM (
          SELECT DISTINCT dict
            , CONCAT_WS(',', unique_keys) as unique_keys
          FROM (
            SELECT dict
              , COLLECT_SET(keys) OVER () AS unique_keys
            FROM (
              SELECT dict
                , keys
              FROM (
                SELECT dict
                  , map_keys(str_to_map(dict)) AS key_arr
                FROM (
                  SELECT TRANSFORM (col1, col2)
                         USING 'python udf0.py'
                         AS (dict STRING)
                  FROM db.tbl ) x ) z
              LATERAL VIEW EXPLODE(key_arr) exptbl AS keys ) a ) b ) c
        

        输出:

        a:6,b:2,c:6,d:0
        a:21,b:7,c:0,d:5
        

        说明:

        第一个 UDF 将获取您的字符串,将其转换为 python 字典并更新键(即,将具有匹配键的值相加)。由于您不知道实际的键,因此您将知道需要从每个字典中提取键(hive 查询中的map_keys()),分解表,然后将它们收集回一个唯一的集合中。现在,您将拥有任何字典中所有可能的键。然后从那里,您可以使用第二个 UDF 导入在第一个 UDF 中创建的字典,检查每个键是否存在,如果不存在,则将其值设为零。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2017-01-05
          • 1970-01-01
          • 1970-01-01
          • 2016-05-27
          • 2016-12-11
          • 1970-01-01
          • 1970-01-01
          • 2020-06-14
          相关资源
          最近更新 更多