• 实验目的

掌握Map/Reduce相关原理和设计方法,设计相关的应用。

  • 实验内容

    一、数据集及程序功能要求

数据集stock-daily,包含A股近4000只股票的最近30天日数据,根据此数据实现股票风险监测统计:统计和输出股票代码和风险值

数据来源:https://www.joinquant.com/help/api/help?name=JQData

风险值统计方法:

1. 忽略股票停牌当日数据

2. 忽略N/A数据行

3. 股价下行指数,((开盘价 - 收盘价) / (收盘价 - 最低价+0.1)3.MapReduce程序设计 ) / 有效数据总天数

二、MapReduce环境配置

三、代码编写

1、基本代码逻辑:

Map

输入:一行数据(一只股票的日数据)

处理:使用 \t 将字符串split成数组,提取需要计算的值,并转为浮点数

输出:<股票代码, 股票当日下行指数>

      遇到无效数据不输出(停牌股票或有N/A数据无法提取为浮点数)

 

Reduce:

输入:<股票代码,[股票每日下行指数]>

处理:计算均值

输出:<股票代码,股票下行指数>

 

2、选做,对结果进行归一化处理:(归一化到区间:-1~1)

Map:

输入目录:上一个Reduce的输出目录

输入:<股票代码,股票下行指数>

处理:将股票代码和股票下行指数做字符串相接,中间用空格隔开,键用一个固定的键名表示,这样可以将所有股票的数据全部交由一个Reduce处理

输出:<"fixedkey","股票代码 股票下行指数">

 

Reduce:

输入:<"fixedkey",["股票代码 股票下行指数"]>

处理:(1) 遍历字符串数组,取出字符串中空格符后的部分(提示:用split可取),转成浮点数,找到股票下行指数最大值(max)和最小值的绝对值(absmin)

         (2) 再次遍历字符串数组,计算每个股票的归一化值,并写入键值对

         计算公式:若下行指数 > 0,归一化值 = 下行指数/max

                若下行指数 < 0,归一化值 = 下行指数/absmin

                       若下行指数 = 0,归一化值 = 0

输出:<股票代码, "下行指数值\t下行指数归一化值">

输出数据包含股票代码以及其下行指数值和归一化值

三、   实验步骤

1、打开IntelliJ或Eclipse创建Maven项目;

(1)新建Marven项目

3.MapReduce程序设计3.MapReduce程序设计3.MapReduce程序设计

(2)修改pom.xml

3.MapReduce程序设计

3.MapReduce程序设计

2、参考WordCount示例代码编写本实验代码;

代码:

package swpu.edu.cn;

import
java.io.IOException;
import
java.util.Iterator;

import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.conf.Configured;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.FloatWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import
org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
import
org.apache.log4j.Logger;

public class
StockDaily extends Configured implements Tool {
   
public static void main(String[] args) throws Exception {
       
int res = ToolRunner.run(new StockDaily(), args);
       
System.exit(res);
   
}

   
private static Logger logger = Logger.getLogger(StockDaily.class.getName());
   
@Override
   
public int run(String[] args) throws Exception {
       
// TODO Auto-generated method stub
       
Configuration conf = new Configuration();

       
Job job = Job.getInstance(conf, "stockdaily");
       
logger.info("args1===" + args[1]);
       
logger.info("args2===" + args[2]);
       
// 注入作业的主类
       
job.setJarByClass(StockDaily.class);

       
// 为作业注入MapReduce
       
job.setMapperClass(Map.class);
       
job.setReducerClass(Reduce.class);
       
// 指定输入类型为:文本格式文件;注入文本输入格式类
       
job.setInputFormatClass(TextInputFormat.class);
       
TextInputFormat.addInputPath(job, new Path(args[1]));
       
// 指定输出格式为:文本格式文件;注入文本输入格式类
       
job.setOutputFormatClass(TextOutputFormat.class);
       
// 指定Key为文本格式;注入文本类
       
job.setOutputKeyClass(Text.class);
       
// 执行Value为浮点格式;注入浮点类
       
job.setOutputValueClass(FloatWritable.class);
       
// 指定作业的输出目录
       
TextOutputFormat.setOutputPath(job, new Path(args[2])); // HDFS路径
       
boolean res = job.waitForCompletion(true);
        if
(res)
           
return 0;
        else
            return
-1;
   
}

   
public static class Map extends Mapper<Object, Text, Text, FloatWritable> {
       
private final static FloatWritable index = new FloatWritable();
        private
Text code = new Text();

       
@Override
       
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString()
;
           
//System.out.println("file: " + ((FileSplit) context.getInputSplit()).getPath().toString());
            //System.out.println("line: " + key.toString() + "================" + line);
           
String[] infos = line.split("\t");
            if
((infos[12].equals("False")) && (infos[2].equals("N/A") == false)) {
               
float open = Float.valueOf(infos[2]);
                float
close = Float.valueOf(infos[3]);
                float
low = Float.valueOf(infos[5]);
                
// for (String info : infos) {
                // context.write(new Text(code), one);
                // }
               
index.set((float) ((open - close) / (close - low + 0.1) / 1));
               
code.set(infos[0]);
               
context.write(code, index);
           
}

        }
    }

   
public static class Reduce extends Reducer<Text, FloatWritable, Text, Float> {

       
@Override
       
public void reduce(Text key, Iterable<FloatWritable> index, Context context)
               
throws IOException, InterruptedException {
           
float sum = 0;
            int
num = 0;
           
Iterator<FloatWritable> values = index.iterator();
            while
(values.hasNext()) {
                sum += values.next().get()
;
               
num++;
           
}
            System.
out.print("code:" + key.toString() + "=======" + "index:" + sum / num + "\n");
           
context.write(key, sum / num);
       
}
    }
}

 

3JAR

       (1) 编码完成后,先run一次项目,让开发环境将Java代码编译一次,run命令是先编译再执行,这里会报执行错误,只要能编译成功即可;也可运行build --> recompile只执行编译。

(2) 执行:File --> Project Structure --> Project Settings --> Artifacts --> + --> JAR --> Empty创建一个名为unnamed的空JAR包,并将其命名为想要的名字

(3) 为JAR添加目录,目录结构必须与包名一致,如:包名为cn.edu.swpu.scs,那么目录结构就必须为/cn/edu/swpu/scs,如下图:

3.MapReduce程序设计

(4) 在包中添加文件,在刚创建的包目录中添加class文件,如下图:

3.MapReduce程序设计

3.MapReduce程序设计

(5) 在包目录下创建一个META-INF目录,如下图

3.MapReduce程序设计

3.MapReduce程序设计

Mainclass定位到src

3.MapReduce程序设计

至此JAR包的定义完成

(6) 执行 Build --> Build Artifacts完成build,开始打包,完成后,在项目的out目录中可找到创建的JAR包,如下图:

3.MapReduce程序设计

3.MapReduce程序设计

4、传JAR包到虚拟机

3.MapReduce程序设计

5、更改虚拟机配置

1简化配置,使集群只有两个虚拟机节点

一个主节点+一个从节点,workers文件中将主从节点地址都加进去

·修改/etc/hosts文件

3.MapReduce程序设计

3.MapReduce程序设计

·修改 /home/hadoop/hadoop-3.1.2/etc/hadoop/workers,只留下一个节点

3.MapReduce程序设计

3.MapReduce程序设计

2修改Hadoop配置文件mapred-site.xml, yarn-site.xml,并复制到从节点

·修改mapred-site.xml

3.MapReduce程序设计

3.MapReduce程序设计

·修改yarn-site.xml

3.MapReduce程序设计

3.MapReduce程序设计

3.MapReduce程序设计

·复制配置到从节点

3.MapReduce程序设计

3.MapReduce程序设计

3.MapReduce程序设计

3)删除虚拟机三(节点三),并查看节点状态

3.MapReduce程序设计

3.MapReduce程序设计

3.MapReduce程序设计

6、执行JAR包

       1)上传jar包和数据至Hadoop

3.MapReduce程序设计

3.MapReduce程序设计

3.MapReduce程序设计

2执行jar

hadoop  jar  …/xxx.jar main方法所在类 /input路径 /output路径

3.MapReduce程序设计

   3)查看结果

3.MapReduce程序设计

 

3.MapReduce程序设计

 

3.MapReduce程序设计

3.MapReduce程序设计

 

四、实验结果

      试验运行过程及结果

如上所示

五、实验体会

此次实验,由于不熟悉MapReduce,所以实验很吃力。尤其到后期Hadoop文件配置过程中,几个文件的配置,极大影响了实验能否正常运行。

在idea中run时,出现了这个问题,可以直接忽略。

3.MapReduce程序设计

再然后,就是hadoop jar的时候,出现了这个问题。主要还是修改mapred-site.xml, yarn-site.xml这两个文件

3.MapReduce程序设计

实验到此告一段落,然而未知的旅程还在继续中……希望遇到的问题能减少吧……

相关文章:

  • 2022-12-23
  • 2021-12-05
  • 2021-07-04
猜你喜欢
  • 2021-09-15
  • 2021-10-03
  • 2021-08-30
  • 2021-12-05
  • 2021-06-03
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案