首先是wordcount

package org.lukey.hadoop.classifyBayes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

/**
 * 
 * 一次将需要的结果都统计到对应的文件夹中 AFRICA 484017newsML.txt afford 1
 * 
 * 按照这个格式输出给后面处理得到需要的: 1. AFRICA 484017newsML.txt AFRICA 487141newsML.txt
 * 类别中的文本数, ---> 计算先验概率(单独解决这个) 所有类别中的文本总数, ---> 可以由上面得到,计算先验概率
 * 
 * 2. AFRICA afford 1 AFRICA boy 3 每个类中的每个单词的个数,---> 计算各个类中单词的概率
 * 
 * 3. AFRICA 768 类中单词总数, ---> 将2中的第一个key相同的第三个数相加即可
 * 
 * 4. AllWORDS 12345 所有类别中单词种类数 ---> 将1中的第三个key归并,计算个数
 *
 */

public class MyWordCount {

    private static MultipleOutputs<Text, IntWritable> mos;
    static String baseOutputPath = "/user/hadoop/test_out";

    // 设计两个map分别计算每个类别的文本数//和每个类别的单词总数
    static Map<String, List<String>> fileCountMap = new HashMap<String, List<String>>();
    static Map<String, Integer> fileCount = new HashMap<String, Integer>();
    // static Map<String, List<String>> wordsCountInClassMap = new
    // HashMap<String, List<String>>();

    static enum WordsNature {
        CLSASS_NUMBER, CLASS_WORDS, TOTALWORDS
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] otherArgs = { "/user/hadoop/test", "/user/hadoop/mid/wordsFrequence" };

        /*
         * String[] otherArgs = new GenericOptionsParser(conf,
         * args).getRemainingArgs();
         * 
         * if (otherArgs.length != 2) { System.out.println("Usage <in> <out>");
         * System.exit(-1); }
         */
        Job job = new Job(conf, "file count");

        job.setJarByClass(MyWordCount.class);

        // job.setInputFormatClass(CustomInputFormat.class);

        job.setMapperClass(First_Mapper.class);
        job.setReducerClass(First_Reducer.class);

        Path inputpath = new Path(otherArgs[0]);

        // 调用自己写的方法
        MyUtils.addInputPath(job, inputpath, conf);
        // CustomInputFormat.setInputPaths(job, inputpath);
        // FileInputFormat.addInputPath(job, inputpath);
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        int exitCode = job.waitForCompletion(true) ? 0 : 1;

        // 调用计数器
        Counters counters = job.getCounters();
        Counter c1 = counters.findCounter(WordsNature.TOTALWORDS);
        System.out.println("-------------->>>>: " + c1.getDisplayName() + ":" + c1.getName() + ": " + c1.getValue());

        // 将单词种类数写入文件中
        Path totalWordsPath = new Path("/user/hadoop/output/totalwords.txt");
        FileSystem fs = FileSystem.get(conf);
        FSDataOutputStream outputStream = fs.create(totalWordsPath);
        outputStream.writeBytes(c1.getDisplayName() + ":" + c1.getValue());

        // 将每个类的文本个数写入文件中
        Path priorPath = new Path("/user/hadoop/output/priorPro.txt"); // 先验概率

        for (Map.Entry<String, List<String>> entry : fileCountMap.entrySet()) {
            fileCount.put(entry.getKey(), entry.getValue().size());
        }

        // 求文本总数
        int fileSum = 0;
        for (Integer num : fileCount.values()) {
            fileSum += num;
        }
        System.out.println("fileSum = " + fileSum);
        FSDataOutputStream priorStream = fs.create(priorPath);
        // 计算每个类的先验概率并写入文件
        for (Map.Entry<String, Integer> entry : fileCount.entrySet()) {
            double p = (double) entry.getValue() / fileSum;
            priorStream.writeBytes(entry.getKey() + ":" + p);
        }
        IOUtils.closeStream(priorStream);
        IOUtils.closeStream(outputStream);
        // 下次求概率是尝试单词总种类数写到configuration中
        //
        // conf.set("TOTALWORDS", totalWords.toString());

        System.exit(exitCode);

    }

    // Mapper
    static class First_Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private final static IntWritable zero = new IntWritable(0);

        private Text className = new Text();
        private Text countryName = new Text();

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            FileSplit fileSplit = (FileSplit) context.getInputSplit();

            // 文件名
            String fileName = fileSplit.getPath().getName();

            // 文件夹名(即类别名)
            String dirName = fileSplit.getPath().getParent().getName();

            className.set(dirName + "\t" + value.toString());
            countryName.set(dirName + "\t" + fileName + "\t" + value.toString());

            // 将文件名添加到map中用于统计文本个数
            if (fileCountMap.containsKey(dirName)) {
                fileCountMap.get(dirName).add(fileName);
            } else {
                List<String> oneList = new ArrayList<String>();
                oneList.add(fileName);
                fileCountMap.put(dirName, oneList);
            }

            context.write(className, one); // 每个类别的每个单词数 // ABDBI hello 1
            context.write(new Text(dirName), one);// 统计每个类中的单词总数 //ABDBI 1
            context.write(value, zero); // 用于统计所有类中单词个数

        }
    }

    // Reducer
    static class First_Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        // result 表示每个类别中每个单词的个数
        IntWritable result = new IntWritable();
        Map<String, List<String>> classMap = new HashMap<String, List<String>>();
        Map<String, List<String>> fileMap = new HashMap<String, List<String>>();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                        throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            

            // sum为0,总得单词数加1,统计所有单词的种类
            if (sum == 0) {
                context.getCounter(WordsNature.TOTALWORDS).increment(1);
            } else {// sum不为0时,通过key的长度来判断,
                String[] temp = key.toString().split("\t");
                if (temp.length == 2) { // 用tab分隔类别和单词
                result.set(sum);
                context.write(key, result);
//                mos.write(new Text(temp[1]), result, temp[0]);
                }else{    //类别中单词总数
                    result.set(sum);
                    mos.write(key, result, "wordsInClass");
                }
                
                
                
                
                /*
                // 先处理类中的单词数
                String[] temp = key.toString().split("\t");
                if (temp.length == 2) { // 用tab分隔类别和单词
                    if (classMap.containsKey(temp[0])) {
                        classMap.get(temp[0]).add(temp[1]);
                    } else {
                        List<String> oneList = new ArrayList<String>();
                        oneList.add(temp[1]);
                        classMap.put(temp[0], oneList);
                    }
                    // mos.write(temp[0], temp[1], result);
                    result.set(sum);
                    context.write(key, result); // 保存每个类别名,单词名以及个数
                    // mos.write(temp[0], temp[1], result);
                } else if (temp.length == 1) {
                    
                    
                    
                    // 统计文件个数,每个map保存的是一个类别的文件名和文件名列表,list的长度就是个数
                    if (fileMap.containsKey(temp[0])) {
                        fileMap.get(temp[0]).add(temp[1]);
                    } else {
                        List<String> oneList = new ArrayList<String>();
                        oneList.add(temp[1]);
                        fileMap.put(temp[0], oneList);
                    }
                }

                // 计算先验概率
                int fileNumberSum = 0;
                for (List<String> list : classMap.values()) {
                    fileNumberSum += list.size();
                    System.out.println(fileNumberSum);// test
                }

                // 保存先验概率
                Map<String, Double> priorMap = new HashMap<>();
                Iterator<Map.Entry<String, List<String>>> iterators = classMap.entrySet().iterator();

                while (iterators.hasNext()) {
                    Map.Entry<String, List<String>> iterator = iterators.next();
                    double prior = (double) iterator.getValue().size() / fileNumberSum;
                    priorMap.put(iterator.getKey(), prior);

                }

                */
                
                // result.set(sum);
                // context.write(key, result);
            }

        }

        @Override
        protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            mos.close();
        }

        @Override
        protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            mos = new MultipleOutputs<Text, IntWritable>(context);
        }
        
        
        
        
        
    }    

}
View Code

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2021-09-06
  • 2022-12-23
  • 2021-05-17
  • 2022-12-23
  • 2021-05-30
  • 2022-12-23
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-12-26
  • 2021-04-04
  • 2022-12-23
  • 2021-12-26
相关资源
相似解决方案