1、官方示例代码
安装hadoop后,默认带有示例代码,包括著名的WordCount
示例代码路径:hadoop-2.8.5/share/hadoop/mapreduce/sources/
2、重写WordCount.java
package test.word;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCount extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
return singleJob(args);
}
public static void main(String[] args) {
int exitCode = -1;
try {
exitCode = ToolRunner.run(new WordCount(), args);
} catch (Exception e) {
e.printStackTrace();
}finally{
System.exit(exitCode);
}
}
/**
* MapReduce 程序的业务编码分为两个大部分,一部分配置程序的运行信息,
* 一部分 编写该 MapReduce 程序的业务逻辑,
* 并且业务逻辑的 map 阶段和 reduce 阶段的代码分别继 承 Mapper 类和 Reducer 类
* @param args
*/
public int singleJob(String[] args){
try{
Configuration conf = new Configuration();
//if no set defaultFS then the path is current user's localfile
conf.set("fs.defaultFS", "hdfs://cos6730:9000");
//运行时设置环境变量 java -DHADOOP_USER_NAME=root
//程序中设置环境变量
// System.setProperty("HADOOP_USER_NAME", "root");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
/**
* Job: It allows the user to configure the job, submit it, control its execution, and query the state.
* 它允许用户配置作业、提交作业、控制作业执行和查询状态
*/
// Create a new Job
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordMapper.class);
job.setCombinerClass(WordReducer.class);
job.setReducerClass(WordReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
//if output path is exists,then delete
Path resultPath = new Path(otherArgs[otherArgs.length - 1]);
FileSystem fs = resultPath.getFileSystem(conf);
if(fs.exists(resultPath)){
fs.delete(resultPath, true);
System.out.println("-- resultPath is delete --");
}
//output result
FileOutputFormat.setOutputPath(job, resultPath);
// Submit the job, then poll for progress until the job is complete
boolean flag = job.waitForCompletion(true);//true: print the progress to the user
if(flag){
System.out.println("-- job is waitForCompletion --"+flag);
}
return flag ? 0 : 1;
}catch(Exception e){
e.printStackTrace();
}
//exception
return -1;
}
}