1. 输出格式OutputFormat
在MapReduce框架中,OutputFormat负责把Reducer处理完成的Key-Value写出到本地磁盘或HDFS上,默认计算结果会以part-000的命名输出成多个文件,并且输出的文件数量与Reduce数量一致。000是关联到某个Reduce任务的分区的Id号。关于分区号id号如何确定,请查看:
MapReduce原理分析:自定义分区Partition和数据倾斜解决之道
MapReduce提供多种输出格式,用户可以灵活设置输出的路径、文件名、输出格式等。输出格式类实现OutputFormat接口,FileOutputFormat是实现的抽象类,常见的实现类还包括TextOutputFormat、SequenceFileOutputFormat、NullOutputFormat、DBOutputFormat,继承关系图如图所示。
1)TextOutputFormat, 默认输出字符串输出格式,key和value中间值用tab隔开;
2)SequenceFileOutputFormat, 序列化文件输出,将key和value以sequencefile格式输出;
3)MultipleOutputs,可以把输出数据输送到不同的目录;
4)NullOutputFormat, 把输出输出到/dev/null中,即不输出任何数据,这个应用场景是在MR中进行了逻辑处理,同时输出文件已经在MR中进行了输出,而不需要在输出的情况;
5)DBOutputFormat, 适用于将作业输出数据(数据量太大不适合)存到Mysql、Oracle等数据库,在写出数据时会并行连接数据库,需要设置合适的map、reduce个数以便将并行连接的数量控制在合理的范围之内。
在驱动程序中可以通过特定方法实现输出定义:
1)指定输出的格式化类job.setOutputFormatClass(TextOutputFormat.class)
2)设置输出的文件名TextOutputFormat.setOutputName(job, "foobar")
3)设置输出路径TextOutputFormat.setOutputPath()
2. 自定义输出格式
2.1 自定义输出格式的步骤
实现自定义输出格式,我们需要
(1)继承OutputFormat的类,实现getRecordWriter方法,返回一个RecordWriter类型;
(2)继承RecordWriter的类,定义其write方法,针对每个<key,value>写入文件数据;
OutputFormat的接口定义如下,在具体实现时需要自定义RecordWriter和OutputCommitter类,其中OutputCommitter类由于不涉及到具体的输出目的地,所以一般情况下,不用重写,可直接使用FileOutputCommitter对象,RecordWriter类是具体的定义如何将数据写到目的地的。
public abstract class OutputFormat<K, V> {
// 获取具体的数据写出对象
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException;
// 检查输出配置信息是否正确
public abstract void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException;
// 获取输出job的提交者对象
public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException;
}
自定义RecordWriter需要实现以下方法:
public abstract class RecordWriter<K, V> {
// 具体的写数据的方法
public abstract void write(K key, V value)
throws IOException, InterruptedException;
// 关闭资源
public abstract void close(TaskAttemptContext context)
throws IOException, InterruptedException;
}
3. 自定义输出格式案例
3.1 案例描述
用户行为分析,是指在获得网站或APP等平台访问量基本数据的情况下,对有关数据进行统计、分析,从中发现用户访问网站或APP等平台的规律,并将这些规律与网络营销策略等相结合,从而发现目前网络营销活动中可能存在的问题,并为进一步修正或重新制定网络营销策略提供依据。
——百度百科
假设某网站有一亿用户,每日每个用户平均访问网页数为5次,访问日志记录了用户类型(ABC三类分别代表贵宾级、重要级和一般级)、用户id、访问时间、延续时长、访问产品等字段,总访问量5亿条记录,约100G日志数据。现需对日志进行统计分析,统计每类用户最喜欢访问的产品以支撑精准营销,最后的输出结果按照用户类型输出成A.txt、B.txt、C.txt三个不同的文件。
数据格式举例:
id userid usertype appname timestamp duration remotehost area
100010010 1001 C NEWS 1545271800010 103 200.41.1.25 W
100010011 1002 B FORUM 1545271800010 95 201.3.7.11 E
100012313 1002 B FORUM 1545298440019 95 201.3.7.11 NE
100042315 1230 A SPORTS 1545271800010 1004 192.169.49.12 SW
...
3.2 实现思路
MapReduce中FileOutputFormat输出格式的输出文件名称为part-r-xxxxx,虽然通过FileOutputFormat提供的setOutputName、setOutputPath等方法,我们可以在驱动程序中配置输出的路径、文件名等,但上述案例中要求文件名称不能固定,必须按照Key来动态生成,这就需要在自定义OutputFormat中获取到Key后再赋值。
3.3 代码实现
完整代码见:https://github.com/majxbear/mapreduce-applications
1)自定义FileNameOutputFormat实现
/**
* 自定义输出格式,实现以Key+".txt"为输出文件名称
*/
public class FileNameOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static String OUTPUT_FILE_EXTENSION = "txt";
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return new MyRecordWriter(job, getTaskOutputPath(job));
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path workPath = null;
OutputCommitter committer = super.getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPath = super.getOutputPath(conf);
if (outputPath == null) {
throw new IOException("Undefined job output-path");
}
workPath = outputPath;
}
return workPath;
}
/**
* 自定义实现RecordWriter,实现通过key指定文件名
*/
public class MyRecordWriter extends RecordWriter<K, V> {
private HashMap<String, RecordWriter<K, V>> recordWriters = null;
private TaskAttemptContext job = null;
private Path workPath = null;
public MyRecordWriter(TaskAttemptContext job, Path workPath) {
super();
this.job = job;
this.workPath = workPath;
recordWriters = new HashMap<String, RecordWriter<K, V>>();
}
private String generateFileNameByKey(K key, String extension) {
return key + "." + extension;
}
public void write(K key, V value) throws IOException, InterruptedException {
//获取文件名的方法
String baseName = generateFileNameByKey(key, OUTPUT_FILE_EXTENSION);
RecordWriter<K, V> rw = this.recordWriters.get(baseName);
if (rw == null) {
rw = getBaseRecordWriter(job, baseName);
this.recordWriters.put(baseName, rw);
}
rw.write(key, value);
}
private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = ":";
RecordWriter<K, V> recordWriter = null;
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
Path file = new Path(workPath, baseName + codec.getDefaultExtension());
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);
} else {
Path file = new Path(workPath, baseName);
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
}
return recordWriter;
}
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
while (values.hasNext()) {
values.next().close(context);
}
this.recordWriters.clear();
}
}
/**
* 代码与TextOutputFormat中的LineRecordWriter完全一致
* 因为TextOutputFormat中的LineRecordWriter是protected,在不同包下无法引用
* 所以需要重新定义
*/
protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
}
2)驱动类
public class LogStatistics {
public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> {
private Text val = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String str[] = value.toString().split(" ");
//100010010 1001 C NEWS 1545271800010 103 200.41.1.25 W
val.set(str[1] + " " + str[3]);
context.write(new Text(str[2]), val);
}
}
public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text val : values) {
context.write(key, val);
}
}
}
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
if (args.length < 2) {
System.out.println("参数不足");
System.exit(1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置自定义的输出格式
job.setOutputFormatClass(FileNameOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}