在eclipse软件里创建一个maven项目

19.通过MAPREDUCE 把收集数据进行清洗

 

19.通过MAPREDUCE 把收集数据进行清洗

 

 

19.通过MAPREDUCE 把收集数据进行清洗

 

 19.通过MAPREDUCE 把收集数据进行清洗

 

 19.通过MAPREDUCE 把收集数据进行清洗

 

jdk要换成本地安装的1.8版本的

19.通过MAPREDUCE 把收集数据进行清洗

 

 加载pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.it19gong</groupId>
  <artifactId>clickLog</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>clickLog</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0</version>
        </dependency>


        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>E:/software/jdk1.8/lib/tools.jar</systemPath>
        </dependency>
        
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>2.1.0</version>
        </dependency>
        
     
        
        <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.33</version>
      </dependency>
  </dependencies>
  
</project>

 

 

在加载依赖包的时候如果出现错误,在仓库里找不到1.8jdk.tools

 

 在这个地方改成本地的jdk绝对路径,再重新加载一次maven的依赖包

19.通过MAPREDUCE 把收集数据进行清洗

 

我这里修改成

19.通过MAPREDUCE 把收集数据进行清洗

 

 

在项目下新建AccessLogPreProcessMapper类

19.通过MAPREDUCE 把收集数据进行清洗

 

 

 19.通过MAPREDUCE 把收集数据进行清洗

package com.it19gong.clickLog;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AccessLogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    Text text = new Text();
    @Override
protected void map(LongWritable key, Text value,Context context)
        throws IOException, InterruptedException {
       String itr[] = value.toString().split(" ");
       if (itr.length < 11)
        {
            return;
        }
        String ip = itr[0];
        String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);
        String url = itr[6];
        String upFlow = itr[9];
        
        text.set(ip+","+date+","+url+","+upFlow);
        context.write(text, NullWritable.get());
       
}
}

 

 

 

创建AnalysisNginxTool类

19.通过MAPREDUCE 把收集数据进行清洗

package com.it19gong.clickLog;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AnalysisNginxTool
{
    private static Logger logger = LoggerFactory.getLogger(AnalysisNginxTool.class);

    public static String nginxDateStmpToDate(String date)
    {
        String res = "";
        try
        {
            SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");
            String datetmp = date.split(" ")[0].toUpperCase();
            String mtmp = datetmp.split("/")[1];
            DateToNUM.initMap();
            datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));
            System.out.println(datetmp);
            Date d = df.parse(datetmp);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");
            res = sdf.format(d);
        }
        catch (ParseException e)
        {
            logger.error("error:" + date, e);
        }
        return res;
    }

    public static long nginxDateStmpToDateTime(String date)
    {
        long l = 0;
        try
        {
            SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");
            String datetmp = date.split(" ")[0].toUpperCase();
            String mtmp = datetmp.split("/")[1];
            datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));

            Date d = df.parse(datetmp);
            l = d.getTime();
        }
        catch (ParseException e)
        {
            logger.error("error:" + date, e);
        }
        return l;
    }
}

 

 

 创建DateToNUM类

19.通过MAPREDUCE 把收集数据进行清洗

package com.it19gong.clickLog;

import java.util.HashMap;

public class DateToNUM
{
    public static HashMap map = new HashMap();

    public static void initMap()
    {
        map.put("JAN", "01");
        map.put("FEB", "02");
        map.put("MAR", "03");
        map.put("APR", "04");
        map.put("MAY", "05");
        map.put("JUN", "06");
        map.put("JUL", "07");
        map.put("AUG", "08");
        map.put("SEPT", "09");
        map.put("OCT", "10");
        map.put("NOV", "11");
        map.put("DEC", "12");
    }
}

 

 

 新建AccessLogDriver类

19.通过MAPREDUCE 把收集数据进行清洗

 

package com.it19gong.clickLog;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AccessLogDriver {
    
    public static void main(String[] args) throws Exception {
        DateToNUM.initMap();
        Configuration conf = new Configuration();
        if(args.length != 2){
            args = new String[2];
            args[0] =  "hdfs://node1/data/clickLog/20190620/";
            args[1]    =  "hdfs://node1/uvout/hive" ;
        }

        Job job = Job.getInstance(conf); // 设置一个用户定义的job名称
        job.setJarByClass(AccessLogDriver.class);
        job.setMapperClass(AccessLogPreProcessMapper.class); // 为job设置Mapper类
        // 为job设置Reducer类
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);// 为job的输出数据设置Key类
        job.setMapOutputValueClass(NullWritable.class);// 为job输出设置value类
        FileInputFormat.addInputPath(job, new Path(args[0])); // 为job设置输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));// 为job设置输出路径
        System.exit(job.waitForCompletion(true) ? 0 : 1); // 运行job
    }

}

 

 

把工程打包成Jar包

19.通过MAPREDUCE 把收集数据进行清洗

 

 19.通过MAPREDUCE 把收集数据进行清洗

 

 19.通过MAPREDUCE 把收集数据进行清洗

 

 19.通过MAPREDUCE 把收集数据进行清洗

 

 19.通过MAPREDUCE 把收集数据进行清洗

 

 

把jar包上传到集群

19.通过MAPREDUCE 把收集数据进行清洗

 

 

 在集群上运行一下,先检查一下集群的启动进程

19.通过MAPREDUCE 把收集数据进行清洗

19.通过MAPREDUCE 把收集数据进行清洗

19.通过MAPREDUCE 把收集数据进行清洗

19.通过MAPREDUCE 把收集数据进行清洗

 

 

 hadoop jar mrclick.jar com.it19gong.clickLog.AccessLogDriver

 

 19.通过MAPREDUCE 把收集数据进行清洗

19.通过MAPREDUCE 把收集数据进行清洗

 

 

 可以看到输出目录

19.通过MAPREDUCE 把收集数据进行清洗

 

 

查看清洗后的数据

19.通过MAPREDUCE 把收集数据进行清洗

 

相关文章:

  • 2022-12-23
  • 2021-10-29
  • 2021-04-18
  • 2022-12-23
  • 2022-12-23
  • 2022-01-05
  • 2021-07-27
  • 2022-12-23
猜你喜欢
  • 2021-04-22
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-08-17
  • 2021-11-10
相关资源
相似解决方案