1. 输出格式OutputFormat

在MapReduce框架中,OutputFormat负责把Reducer处理完成的Key-Value写出到本地磁盘或HDFS上,默认计算结果会以part-000的命名输出成多个文件,并且输出的文件数量与Reduce数量一致。000是关联到某个Reduce任务的分区的Id号。关于分区号id号如何确定,请查看:
MapReduce原理分析:自定义分区Partition和数据倾斜解决之道

MapReduce提供多种输出格式,用户可以灵活设置输出的路径、文件名、输出格式等。输出格式类实现OutputFormat接口,FileOutputFormat是实现的抽象类,常见的实现类还包括TextOutputFormat、SequenceFileOutputFormat、NullOutputFormat、DBOutputFormat,继承关系图如图所示。
MapReduce原理分析:自定义输出格式OutputFormat
1)TextOutputFormat, 默认输出字符串输出格式,key和value中间值用tab隔开;
2)SequenceFileOutputFormat, 序列化文件输出,将key和value以sequencefile格式输出;
3)MultipleOutputs,可以把输出数据输送到不同的目录;
4)NullOutputFormat, 把输出输出到/dev/null中,即不输出任何数据,这个应用场景是在MR中进行了逻辑处理,同时输出文件已经在MR中进行了输出,而不需要在输出的情况;
5)DBOutputFormat, 适用于将作业输出数据(数据量太大不适合)存到Mysql、Oracle等数据库,在写出数据时会并行连接数据库,需要设置合适的map、reduce个数以便将并行连接的数量控制在合理的范围之内。

在驱动程序中可以通过特定方法实现输出定义:
1)指定输出的格式化类
job.setOutputFormatClass(TextOutputFormat.class)
2)设置输出的文件名
TextOutputFormat.setOutputName(job, "foobar")
3)设置输出路径
TextOutputFormat.setOutputPath()

2. 自定义输出格式

2.1 自定义输出格式的步骤

实现自定义输出格式,我们需要
(1)继承OutputFormat的类,实现getRecordWriter方法,返回一个RecordWriter类型;
(2)继承RecordWriter的类,定义其write方法,针对每个<key,value>写入文件数据;

OutputFormat的接口定义如下,在具体实现时需要自定义RecordWriter和OutputCommitter类,其中OutputCommitter类由于不涉及到具体的输出目的地,所以一般情况下,不用重写,可直接使用FileOutputCommitter对象,RecordWriter类是具体的定义如何将数据写到目的地的。

public abstract class OutputFormat<K, V> { 
    // 获取具体的数据写出对象
    public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
     throws IOException, InterruptedException; 
    // 检查输出配置信息是否正确
    public abstract void checkOutputSpecs(JobContext context)
     throws IOException, InterruptedException; 
    // 获取输出job的提交者对象
    public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context)
     throws IOException, InterruptedException; 
}

自定义RecordWriter需要实现以下方法:

public abstract class RecordWriter<K, V> {  
    // 具体的写数据的方法
    public abstract void write(K key, V value)
     throws IOException, InterruptedException;  
     // 关闭资源
    public abstract void close(TaskAttemptContext context)
     throws IOException, InterruptedException; 
}

3. 自定义输出格式案例

3.1 案例描述

用户行为分析,是指在获得网站或APP等平台访问量基本数据的情况下,对有关数据进行统计、分析,从中发现用户访问网站或APP等平台的规律,并将这些规律与网络营销策略等相结合,从而发现目前网络营销活动中可能存在的问题,并为进一步修正或重新制定网络营销策略提供依据。
——百度百科

假设某网站有一亿用户,每日每个用户平均访问网页数为5次,访问日志记录了用户类型(ABC三类分别代表贵宾级、重要级和一般级)、用户id、访问时间、延续时长、访问产品等字段,总访问量5亿条记录,约100G日志数据。现需对日志进行统计分析,统计每类用户最喜欢访问的产品以支撑精准营销,最后的输出结果按照用户类型输出成A.txt、B.txt、C.txt三个不同的文件。
数据格式举例:

id userid usertype appname timestamp duration remotehost area
100010010 1001 C NEWS 1545271800010 103 200.41.1.25 W
100010011 1002 B FORUM 1545271800010 95 201.3.7.11 E
100012313 1002 B FORUM 1545298440019 95 201.3.7.11 NE
100042315 1230 A SPORTS 1545271800010 1004 192.169.49.12 SW
...

MapReduce原理分析:自定义输出格式OutputFormat

3.2 实现思路

MapReduce中FileOutputFormat输出格式的输出文件名称为part-r-xxxxx,虽然通过FileOutputFormat提供的setOutputName、setOutputPath等方法,我们可以在驱动程序中配置输出的路径、文件名等,但上述案例中要求文件名称不能固定,必须按照Key来动态生成,这就需要在自定义OutputFormat中获取到Key后再赋值。

3.3 代码实现

完整代码见:https://github.com/majxbear/mapreduce-applications
1)自定义FileNameOutputFormat实现

/**
 * 自定义输出格式,实现以Key+".txt"为输出文件名称
 */
public class FileNameOutputFormat<K, V> extends FileOutputFormat<K, V> {
    public static String OUTPUT_FILE_EXTENSION = "txt";

    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        return new MyRecordWriter(job, getTaskOutputPath(job));
    }

    private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
        Path workPath = null;
        OutputCommitter committer = super.getOutputCommitter(conf);
        if (committer instanceof FileOutputCommitter) {
            workPath = ((FileOutputCommitter) committer).getWorkPath();
        } else {
            Path outputPath = super.getOutputPath(conf);
            if (outputPath == null) {
                throw new IOException("Undefined job output-path");
            }
            workPath = outputPath;
        }
        return workPath;
    }

    /**
     * 自定义实现RecordWriter,实现通过key指定文件名
     */
    public class MyRecordWriter extends RecordWriter<K, V> {
        private HashMap<String, RecordWriter<K, V>> recordWriters = null;
        private TaskAttemptContext job = null;
        private Path workPath = null;

        public MyRecordWriter(TaskAttemptContext job, Path workPath) {
            super();
            this.job = job;
            this.workPath = workPath;
            recordWriters = new HashMap<String, RecordWriter<K, V>>();
        }

        private String generateFileNameByKey(K key, String extension) {
            return key + "." + extension;
        }

        public void write(K key, V value) throws IOException, InterruptedException {
            //获取文件名的方法
            String baseName = generateFileNameByKey(key, OUTPUT_FILE_EXTENSION);
            RecordWriter<K, V> rw = this.recordWriters.get(baseName);
            if (rw == null) {
                rw = getBaseRecordWriter(job, baseName);
                this.recordWriters.put(baseName, rw);
            }
            rw.write(key, value);
        }

        private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException {
            Configuration conf = job.getConfiguration();
            boolean isCompressed = getCompressOutput(job);
            String keyValueSeparator = ":";
            RecordWriter<K, V> recordWriter = null;
            if (isCompressed) {
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
                CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
                Path file = new Path(workPath, baseName + codec.getDefaultExtension());
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);
            } else {
                Path file = new Path(workPath, baseName);
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
            }
            return recordWriter;
        }

        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
            while (values.hasNext()) {
                values.next().close(context);
            }
            this.recordWriters.clear();
        }
    }

    /**
     * 代码与TextOutputFormat中的LineRecordWriter完全一致
     * 因为TextOutputFormat中的LineRecordWriter是protected,在不同包下无法引用
     * 所以需要重新定义
     */
    protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {
        private static final String utf8 = "UTF-8";
        private static final byte[] newline;

        static {
            try {
                newline = "\n".getBytes(utf8);
            } catch (UnsupportedEncodingException uee) {
                throw new IllegalArgumentException("can't find " + utf8 + " encoding");
            }
        }

        protected DataOutputStream out;
        private final byte[] keyValueSeparator;

        public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
            this.out = out;
            try {
                this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
            } catch (UnsupportedEncodingException uee) {
                throw new IllegalArgumentException("can't find " + utf8 + " encoding");
            }
        }

        private void writeObject(Object o) throws IOException {
            if (o instanceof Text) {
                Text to = (Text) o;
                out.write(to.getBytes(), 0, to.getLength());
            } else {
                out.write(o.toString().getBytes(utf8));
            }
        }

        public synchronized void write(K key, V value)
                throws IOException {
            boolean nullKey = key == null || key instanceof NullWritable;
            boolean nullValue = value == null || value instanceof NullWritable;
            if (nullKey && nullValue) {
                return;
            }
            if (!nullKey) {
                writeObject(key);
            }
            if (!(nullKey || nullValue)) {
                out.write(keyValueSeparator);
            }
            if (!nullValue) {
                writeObject(value);
            }
            out.write(newline);
        }

        public synchronized void close(TaskAttemptContext context) throws IOException {
            out.close();
        }
    }
}

2)驱动类

public class LogStatistics {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> {
        private Text val = new Text();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String str[] = value.toString().split(" ");
            //100010010 1001 C NEWS 1545271800010 103 200.41.1.25 W
            val.set(str[1] + " " + str[3]);
            context.write(new Text(str[2]), val);
        }
    }

    public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text val : values) {
                context.write(key, val);
            }
        }
    }

    public static void main(String[] args)
            throws IOException, InterruptedException, ClassNotFoundException {
        if (args.length < 2) {
            System.out.println("参数不足");
            System.exit(1);
        }
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //设置自定义的输出格式
        job.setOutputFormatClass(FileNameOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}

相关文章: