【问题标题】:Read hive table from mapreduce从 mapreduce 读取配置单元表
【发布时间】:2013-04-24 12:01:20
【问题描述】:

我目前正在编写一个 mapreduce 程序来查找两个 hive 表之间的差异。 我的 hive 表在一个或多个列上分区。所以文件夹名称包含分区列的值。

有没有办法读取hive分区表。

可以在mapper中读取吗?

【问题讨论】:

  • 您可能想要编辑您的问题,以强调您正在寻找一种方法来读取获取映射器中被分区的列的值。现在,回答者可能只想知道如何将所有输入文件递归地添加到 MapReduce 作业(在 StackOverflow 上已多次解决)。但是,当部分数据位于输入文件夹结构中时重新创建表格似乎是您问题的一个重要元素。

标签: hadoop mapreduce hive


【解决方案1】:

因为默认情况下,底层 HDFS 数据将在分区 hive 表中组织为

 table/root/folder/x=1/y=1
 table/root/folder/x=1/y=2
 table/root/folder/x=2/y=1
 table/root/folder/x=2/y=2....,

您可以在驱动程序中构建这些输入路径中的每一个,并通过对 FileInputFormat.addInputPath(job, path) 的多次调用来添加它们。每个您构建的文件夹路径调用一次。

下面粘贴的示例代码。注意如何将路径添加到 MyMapper.class。在此示例中,我使用的是 MultipleInputs API。表由“part”和“xdate”分区。

public class MyDriver extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        conf.set("mapred.compress.map.output", "true");
        conf.set("mapred.output.compression.type", "BLOCK"); 

        Job job = new Job(conf);
        //set up various job parameters
        job.setJarByClass(MyDriver.class);
        job.setJobName(conf.get("job.name"));
        MultipleInputs.addInputPath(job, new Path(conf.get("root.folder")+"/xdate="+conf.get("start.date")), TextInputFormat.class, OneMapper.class);
        for (Path path : getPathList(job,conf)) {
            System.out.println("path: "+path.toString());
            MultipleInputs.addInputPath(job, path, Class.forName(conf.get("input.format")).asSubclass(FileInputFormat.class).asSubclass(InputFormat.class), MyMapper.class);
        }
        ...
        ...
        return job.waitForCompletion(true) ? 0 : -2;

    }

    private static ArrayList<Path> getPathList(Job job, Configuration conf) {
        String rootdir = conf.get("input.path.rootfolder");
        String partlist = conf.get("part.list");
        String startdate_s = conf.get("start.date");
        String enxdate_s = conf.get("end.date");
        ArrayList<Path> pathlist = new ArrayList<Path>();
        String[] partlist_split = partlist.split(",");
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        Date startdate_d = null;
        Date enxdate_d = null;
        Path path = null;
        try {
            startdate_d = sdf.parse(startdate_s);
            enxdate_d = sdf.parse(enxdate_s);
            GregorianCalendar gcal = new GregorianCalendar();
            gcal.setTime(startdate_d);
            Date d = null;
            for (String part : partlist_split) {
                gcal.setTime(startdate_d);
                do {
                    d = gcal.getTime();
                    FileSystem fs = FileSystem.get(conf);
                    path = new Path(rootdir + "/part=" + part + "/xdate="
                            + sdf.format(d));
                    if (fs.exists(path)) {
                        pathlist.add(path);
                    }
                    gcal.add(Calendar.DAY_OF_YEAR, 1);
                } while (d.before(enxdate_d));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return pathlist;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new MyDriver(), args);
        System.exit(res);
    }
}

【讨论】:

  • 在 MapReduce 作业中,您可以调用 context.getInputSplit() 来获取包含输入来源信息的 InputSplit 对象。 FileSplit 实现 InputSplit 并具有 getPath() 方法,您可以解析该方法以获取正在分区的列的值。我找不到任何工作代码示例,因此使用docs 可能是您最好的选择。如果您喜欢冒险,在 hive source 周围戳一下可能会有所帮助。
【解决方案2】:

是的,它可以很容易地在 Mapper 中读取。这个答案基于@Daniel Koverman 提到的想法。

将Context对象传递给Mapper.map(),这样就可以得到文件分割路径

// this gives you the path plus offsets hdfs://.../tablename/partition1=20/partition2=ABC/000001_0:0+12345678
context.ctx.getInputSplit().toString();

// or this gets you the path only
((FileSplit)ctx.getInputSplit()).getPath();

这是一个更完整的解决方案,可以解析出实际的分区值:

class MyMapper extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

    // regex to parse out the /partitionName=partitionValue/ pairs
    private static Pattern partitionRegex = Pattern.compile("(?<=/)(?<name>[_\\-\\w]+)=(?<value>[^/]*)(?=/)");

    public static String parsePartitionValue(String path, String partitionName) throws IllegalArgumentException{
        Matcher m = partitionRegex.matcher(path);
        while(m.find()){
            if(m.group("name").equals(partitionName)){
                return m.group("value");
            }
        }
        throw new IllegalArgumentException(String.format("Partition [%s] not found", partitionName));
    }

    @Override
    public void map(KEYIN key, VALUEIN v, Context ctx) throws IOException, InterruptedException {
        String partitionVal = parsePartitionValue(ctx.getInputSplit().toString(), "my_partition_col");
   }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-12-22
    • 1970-01-01
    • 2022-01-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-01
    • 2021-03-28
    相关资源
    最近更新 更多