1 HDFS的基本使用
查看集群状态
1、打开web控制台查看HDFS集群信息,在浏览器打开http://192.168.18.64:50070/
2、使用命令查看:hdfs dfsadmin -report
使用shell命令操作hdfs
从HDFS下载文件
hadoop fs -get /wordcount/input/wordcount_content.txt #下载wordcount_content.txt到本地当前路径
使用java api操作hdfs
public void testUpload() throws Exception {
Configuration conf = new Configuration();
//可以直接传入 uri和用户身份
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.18.64:9000"),conf,"hadoop"); //最后一个参数为用户名
Thread.sleep(2000);
fs.copyFromLocalFile(new Path("d:/cc.txt"), new Path("/access.log"));
fs.close();
}
2、 MAPREDUCE基本使用
上面演示mapreduce的demo是hadoop提供的,下面演示一个使用代码编写一个wordcount的例子
1、需求
从大量文本文件中,统计出每一个单词出现的总次数
2、思路
Map阶段:
1. 从HDFS的源数据文件中逐行读取数据
2. 将每一行数据切分出单词
3. 为每一个单词构造一个键值对 如(单词,1)
4. 将键值对发送给reduce
Reduce阶段:
1. 接收map阶段输出的单词键值对
2. 将相同单词的键值对汇聚成一组
3. 对每一组,遍历组中的所有“值”,累加求和,即得到每一个单词的总次数
4. 将(单词,总次数)输出到HDFS的文件中
3、代码实现
编写mapper类
package edu.mr.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
Text word = new Text();
Text one = new Text("1");
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//获取行数据
String line = value.toString();
//对数据进行拆分 [hello,qianfeng,hi,qianfeng] [hello,1603] [hi,hadoop,hi,spark]
String [] words = line.split(" ");
//循环数组
for (String s : words) {
word.set(s);
context.write(word, one);
}
}
}
编写reducer类
package edu.mr.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer<Text, Text, Text, Text>{
Text sum = new Text();
@Override
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
//定义一个计数器
int counter = 0;
//循环奇数
for (Text i : value) {
counter += Integer.parseInt(i.toString());
}
sum.set(counter+"");
//reduce阶段的最终输出
context.write(key, sum);
}
}
编写主类,来描述job并提交job
package edu.mr.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static void main(String[] args) {
try {
//获取配置对象
Configuration conf = new Configuration();
//创建job
Job job = new Job(conf, "wordcount");
//为job设置运行主类
job.setJarByClass(WordCount.class);
//设置map阶段的属性
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//设置reduce阶段的属性
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交运行作业job 并打印信息
int isok = job.waitForCompletion(true)?0:1;
//退出job
System.exit(isok);
} catch (IOException | ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
}
4、程序打包
第一步
第二步
第三步
5、准备测试数据
将wordcount_content.txt文件上传到hdfs
hadoop fs mkdir -p /wordcount/input #在hdfs上创建输入数据文件夹
hadoop fs –put /wordcount_content.txt /wordcount/input #wordcount_content.txt上传到hdfs上
6、将程序jar包上传到集群的任意一台服务器上
7、使用命令执行wordcount程序
hadoop jar wordcount.jar edu.qianfeng.mr.day05.wordcount.WordCount /wordcount/input /wordcount/output