掌握Map/Reduce相关原理和设计方法,设计相关的应用。
一、数据集及程序功能要求
数据集stock-daily,包含A股近4000只股票的最近30天日数据,根据此数据实现股票风险监测统计:统计和输出股票代码和风险值
数据来源:https://www.joinquant.com/help/api/help?name=JQData
风险值统计方法:
1. 忽略股票停牌当日数据
2. 忽略N/A数据行
3. 股价下行指数,((开盘价 - 收盘价) / (收盘价 - 最低价+0.1)
) / 有效数据总天数
二、MapReduce环境配置
三、代码编写
1、基本代码逻辑:
Map
输入:一行数据(一只股票的日数据)
处理:使用 \t 将字符串split成数组,提取需要计算的值,并转为浮点数
输出:<股票代码, 股票当日下行指数>
遇到无效数据不输出(停牌股票或有N/A数据无法提取为浮点数)
Reduce:
输入:<股票代码,[股票每日下行指数]>
处理:计算均值
输出:<股票代码,股票下行指数>
2、选做,对结果进行归一化处理:(归一化到区间:-1~1)
Map:
输入目录:上一个Reduce的输出目录
输入:<股票代码,股票下行指数>
处理:将股票代码和股票下行指数做字符串相接,中间用空格隔开,键用一个固定的键名表示,这样可以将所有股票的数据全部交由一个Reduce处理
输出:<"fixedkey","股票代码 股票下行指数">
Reduce:
输入:<"fixedkey",["股票代码 股票下行指数"]>
处理:(1) 遍历字符串数组,取出字符串中空格符后的部分(提示:用split可取),转成浮点数,找到股票下行指数最大值(max)和最小值的绝对值(absmin)
(2) 再次遍历字符串数组,计算每个股票的归一化值,并写入键值对
计算公式:若下行指数 > 0,归一化值 = 下行指数/max
若下行指数 < 0,归一化值 = 下行指数/absmin
若下行指数 = 0,归一化值 = 0
输出:<股票代码, "下行指数值\t下行指数归一化值">
输出数据包含股票代码以及其下行指数值和归一化值
三、 实验步骤
1、打开IntelliJ或Eclipse创建Maven项目;
(1)新建Marven项目



(2)修改pom.xml


2、参考WordCount示例代码编写本实验代码;
代码:
|
package swpu.edu.cn;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
public class StockDaily extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new StockDaily(), args);
System.exit(res);
}
private static Logger logger = Logger.getLogger(StockDaily.class.getName());
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "stockdaily");
logger.info("args1===" + args[1]);
logger.info("args2===" + args[2]);
// 注入作业的主类
job.setJarByClass(StockDaily.class);
// 为作业注入Map和Reduce类
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// 指定输入类型为:文本格式文件;注入文本输入格式类
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[1]));
// 指定输出格式为:文本格式文件;注入文本输入格式类
job.setOutputFormatClass(TextOutputFormat.class);
// 指定Key为文本格式;注入文本类
job.setOutputKeyClass(Text.class);
// 执行Value为浮点格式;注入浮点类
job.setOutputValueClass(FloatWritable.class);
// 指定作业的输出目录
TextOutputFormat.setOutputPath(job, new Path(args[2])); // HDFS路径
boolean res = job.waitForCompletion(true);
if (res)
return 0;
else
return -1;
}
public static class Map extends Mapper<Object, Text, Text, FloatWritable> {
private final static FloatWritable index = new FloatWritable();
private Text code = new Text();
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//System.out.println("file: " + ((FileSplit) context.getInputSplit()).getPath().toString());
//System.out.println("line: " + key.toString() + "================" + line);
String[] infos = line.split("\t");
if ((infos[12].equals("False")) && (infos[2].equals("N/A") == false)) {
float open = Float.valueOf(infos[2]);
float close = Float.valueOf(infos[3]);
float low = Float.valueOf(infos[5]);
// for (String info : infos) {
// context.write(new Text(code), one);
// }
index.set((float) ((open - close) / (close - low + 0.1) / 1));
code.set(infos[0]);
context.write(code, index);
}
}
}
public static class Reduce extends Reducer<Text, FloatWritable, Text, Float> {
@Override
public void reduce(Text key, Iterable<FloatWritable> index, Context context)
throws IOException, InterruptedException {
float sum = 0;
int num = 0;
Iterator<FloatWritable> values = index.iterator();
while (values.hasNext()) {
sum += values.next().get();
num++;
}
System.out.print("code:" + key.toString() + "=======" + "index:" + sum / num + "\n");
context.write(key, sum / num);
}
}
}
|
3、JAR包
(1) 编码完成后,先run一次项目,让开发环境将Java代码编译一次,run命令是先编译再执行,这里会报执行错误,只要能编译成功即可;也可运行build --> recompile只执行编译。
(2) 执行:File --> Project Structure --> Project Settings --> Artifacts --> + --> JAR --> Empty创建一个名为unnamed的空JAR包,并将其命名为想要的名字
(3) 为JAR添加目录,目录结构必须与包名一致,如:包名为cn.edu.swpu.scs,那么目录结构就必须为/cn/edu/swpu/scs,如下图:

(4) 在包中添加文件,在刚创建的包目录中添加class文件,如下图:


(5) 在包目录下创建一个META-INF目录,如下图


Mainclass定位到src

至此JAR包的定义完成
(6) 执行 Build --> Build Artifacts完成build,开始打包,完成后,在项目的out目录中可找到创建的JAR包,如下图:


4、传JAR包到虚拟机

5、更改虚拟机配置
(1)简化配置,使集群只有两个虚拟机节点
一个主节点+一个从节点,workers文件中将主从节点地址都加进去
·修改/etc/hosts文件


·修改 /home/hadoop/hadoop-3.1.2/etc/hadoop/workers,只留下一个节点


(2)修改Hadoop配置文件mapred-site.xml, yarn-site.xml,并复制到从节点
·修改mapred-site.xml


·修改yarn-site.xml



·复制配置到从节点



(3)删除虚拟机三(节点三),并查看节点状态



6、执行JAR包
(1)上传jar包和数据至Hadoop



(2)执行jar包
hadoop jar …/xxx.jar main方法所在类 /input路径 /output路径

(3)查看结果




四、实验结果
试验运行过程及结果
如上所示
五、实验体会
此次实验,由于不熟悉MapReduce,所以实验很吃力。尤其到后期Hadoop文件配置过程中,几个文件的配置,极大影响了实验能否正常运行。
在idea中run时,出现了这个问题,可以直接忽略。

再然后,就是hadoop jar的时候,出现了这个问题。主要还是修改mapred-site.xml, yarn-site.xml这两个文件

实验到此告一段落,然而未知的旅程还在继续中……希望遇到的问题能减少吧……