1. 背景
近日在一个Hadoop项目中使用MultipleInputs增加多输入文件时,发现相同路径仅会加载一次,导致后续的统计任务严重失真。本博文旨在记录异常的排查及解决方案。
2. 情景重现
(1) 准备简版的输入文件test,文件内容为"i am ws",输入的HDFS路径为/work/justTest/test
(2) 源码信息如下,主要是wordCount实现,其中/work/justTest/test作为输入路径,被输入两次:
1 package com.ws.test; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 15 16 public class MutilInputTest { 17 18 public static void main(String[] args) { 19 testMultiInputs(); 20 } 21 22 /** 23 * 测试方法 24 */ 25 public static void testMultiInputs() { 26 27 Configuration conf = new Configuration(); 28 29 conf.set("mapreduce.job.queuename", "default"); 30 conf.setBoolean("mapreduce.map.output.compress", true); 31 conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 0.995f); 32 conf.setInt("mapreduce.task.timeout",0); 33 conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent",0.40f); 34 35 String input = "/work/justTest/test"; 36 try { 37 createMultiInputsTestJob(conf, 38 input , Test1Mapper.class, 39 input , Test2Mapper.class, 40 "/work/justTest/temp", 2, TestReduce.class) 41 .waitForCompletion(true); 42 } catch (Exception e) { 43 e.printStackTrace(); 44 } 45 } 46 47 /** 48 * 任务构建 49 * @param conf 50 * @param input1 51 * @param mapper1 52 * @param input2 53 * @param mapper2 54 * @param outputDir 55 * @param reduceNum 56 * @param reducer 57 * @return 58 */ 59 static Job createMultiInputsTestJob(Configuration conf, 60 String input1, Class<? extends Mapper> mapper1, 61 String input2, Class<? extends Mapper> mapper2, 62 String outputDir, 63 int reduceNum, Class<? extends Reducer> reducer) { 64 try { 65 Job job = new Job(conf); 66 job.setJobName("MultiInputsTest"); 67 job.setJarByClass(MutilInputTest.class); 68 69 job.setNumReduceTasks(reduceNum); 70 job.setReducerClass(reducer); 71 72 job.setInputFormatClass(TextInputFormat.class); 73 MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, mapper1); 74 MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, mapper2); 75 76 Path outputPath = new Path(outputDir); 77 outputPath.getFileSystem(conf).delete(outputPath, true); 78 79 job.setOutputFormatClass(TextOutputFormat.class); 80 TextOutputFormat.setOutputPath(job, outputPath); 81 82 job.setMapOutputKeyClass(Text.class); 83 job.setMapOutputValueClass(Text.class); 84 85 job.setOutputKeyClass(Text.class); 86 job.setOutputValueClass(Text.class); 87 88 return job; 89 } catch (Exception e) { 90 return null; 91 } 92 } 93 94 /** 95 * Mapper类 96 * 97 */ 98 static class Test1Mapper extends Mapper<LongWritable, Text, Text, Text> { 99 Context context; 100 101 String type; 102 103 @Override 104 protected void setup(Context context) throws IOException, 105 InterruptedException { 106 this.context = context; 107 this.type = getDataType(); 108 super.setup(context); 109 } 110 111 @Override 112 protected void map(LongWritable key, Text value, Context context) 113 throws IOException, InterruptedException { 114 String[] words = value.toString().split(""); 115 for(String word : words){ 116 context.getCounter(this.type+"_map_total", "input").increment(1); 117 context.write(new Text(word), new Text("1")); 118 } 119 } 120 121 protected String getDataType(){ 122 return "test1"; 123 } 124 } 125 126 /** 127 * Mapper类继承 128 * 129 */ 130 static class Test2Mapper extends Test1Mapper{ 131 @Override 132 protected String getDataType() { 133 return "test2"; 134 } 135 } 136 137 /** 138 * Reduce类 139 * 140 */ 141 static class TestReduce extends Reducer<Text, Text, Text, Text> { 142 @Override 143 protected void reduce(Text key, Iterable<Text> values, Context context) 144 throws IOException, InterruptedException { 145 int total = 0; 146 while(values.iterator().hasNext()){ 147 total += Integer.parseInt(values.iterator().next().toString()); 148 } 149 context.getCounter("reduce_total", key.toString()+"_"+total).increment(1); 150 } 151 } 152 153 } 154