【问题标题】:Sorted Hadoop WordCount Java已排序的 Hadoop WordCount Java
【发布时间】:2015-05-01 15:18:55
【问题描述】:

我正在用 Java 运行 Hadoop 的 WordCount 程序,我的第一份工作(获取所有单词及其计数)运行良好。 但是,当我做第二份工作时,我遇到了一个问题,应该按他们的出现次数对其进行排序。 我已经阅读了这个问题 (Hadoop WordCount sorted by word occurrences) 以了解如何做第二份工作,但我没有遇到同样的问题。

我的代码:

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class simpleWordExample {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
    } 


    public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {

            int sum = 0;
            for (IntWritable value:values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));

        }

    } 


class Map1 extends Mapper<LongWritable, Text, Text, IntWritable> {

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer stringTokenizer = new StringTokenizer(line);
        while (stringTokenizer.hasMoreTokens()){
            int number = 999;
            String word = "empty";

            if (stringTokenizer.hasMoreTokens()) {
                String str0 = stringTokenizer.nextToken();
                word = str0.trim();
            }

            if (stringTokenizer.hasMoreElements()) {
                String str1 = stringTokenizer.nextToken();
                number = Integer.parseInt(str1.trim());
            }
            context.write(new Text(word), new IntWritable(number));
        }

    }

}

class Reduce1 extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        for (IntWritable value:values) {
            context.write(key, new IntWritable(value.get()));
        }
      }
}



public static void main(String[] args) throws Exception {

    Job job1 = new Job();
    Job job2 = new Job();

   job1.setJobName("wordCount");

   job1.setJarByClass(simpleWordExample.class);

   job1.setOutputKeyClass(Text.class);
   job1.setOutputValueClass(IntWritable.class);

   job1.setMapperClass(Map.class);
   job1.setCombinerClass(Reduce.class);
   job1.setReducerClass(Reduce.class);

   job1.setInputFormatClass(TextInputFormat.class);
   job1.setOutputFormatClass(TextOutputFormat.class);

   FileInputFormat.setInputPaths(job1, new Path("file:///home/cloudera/data.txt"));
   FileOutputFormat.setOutputPath(job1, new Path("file:///home/cloudera/output"));


   job2.setJobName("WordCount1");

   job2.setJarByClass(simpleWordExample.class);

   job2.setOutputKeyClass(Text.class);
   job2.setOutputValueClass(IntWritable.class);

   job2.setMapperClass(Map1.class);
   job2.setCombinerClass(Reduce1.class);
   job2.setReducerClass(Reduce1.class);

   job2.setInputFormatClass(TextInputFormat.class);
   job2.setOutputFormatClass(TextOutputFormat.class);

   FileInputFormat.setInputPaths(job2, new Path("file:///home/cloudera/output/part-00000"));
   FileOutputFormat.setOutputPath(job2, new Path("file:///home/cloudera/outputFinal"));


   job1.submit();
   if (job1.waitForCompletion(true)) {
       job2.submit();
       job2.waitForCompletion(true);
   }
}

}

以及我在控制台中遇到的错误:

15/05/02 09:56:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/02 09:56:37 WARN conf.Configuration: session.id is deprecated. Instead, use dfs.metrics.session-id
15/05/02 09:56:37 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/05/02 09:56:39 WARN mapreduce.JobSubmitter: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/05/02 09:56:39 WARN mapreduce.JobSubmitter: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
15/05/02 09:56:39 INFO input.FileInputFormat: Total input paths to process : 1
15/05/02 09:56:41 INFO mapreduce.JobSubmitter: number of splits:1
15/05/02 09:56:41 WARN conf.Configuration: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
15/05/02 09:56:41 WARN conf.Configuration: mapreduce.combine.class is deprecated. Instead, use mapreduce.job.combine.class
15/05/02 09:56:41 WARN conf.Configuration: mapreduce.map.class is deprecated. Instead, use mapreduce.job.map.class
15/05/02 09:56:41 WARN conf.Configuration: mapred.job.name is deprecated. Instead, use mapreduce.job.name
15/05/02 09:56:41 WARN conf.Configuration: mapreduce.reduce.class is deprecated. Instead, use mapreduce.job.reduce.class
15/05/02 09:56:41 WARN conf.Configuration: mapreduce.inputformat.class is deprecated. Instead, use mapreduce.job.inputformat.class
15/05/02 09:56:41 WARN conf.Configuration: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
15/05/02 09:56:41 WARN conf.Configuration: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
15/05/02 09:56:41 WARN conf.Configuration: mapreduce.outputformat.class is deprecated. Instead, use mapreduce.job.outputformat.class
15/05/02 09:56:41 WARN conf.Configuration: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
15/05/02 09:56:41 WARN conf.Configuration: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
15/05/02 09:56:41 WARN conf.Configuration: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
15/05/02 09:56:45 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1998350370_0001
15/05/02 09:56:48 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
15/05/02 09:56:48 INFO mapreduce.Job: Running job: job_local1998350370_0001
15/05/02 09:56:48 INFO mapred.LocalJobRunner: OutputCommitter set in config null
15/05/02 09:56:48 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
15/05/02 09:56:48 INFO mapred.LocalJobRunner: Waiting for map tasks
15/05/02 09:56:48 INFO mapred.LocalJobRunner: Starting task: attempt_local1998350370_0001_m_000000_0
15/05/02 09:56:48 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
15/05/02 09:56:48 INFO mapred.MapTask: Processing split: file:/home/cloudera/data.txt:0+1528889
15/05/02 09:56:48 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
15/05/02 09:56:52 INFO mapreduce.Job: Job job_local1998350370_0001 running in uber mode : false
15/05/02 09:56:52 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
15/05/02 09:56:52 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
15/05/02 09:56:52 INFO mapred.MapTask: soft limit at 83886080
15/05/02 09:56:52 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
15/05/02 09:56:52 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
15/05/02 09:56:52 INFO mapreduce.Job:  map 0% reduce 0%
15/05/02 09:56:57 INFO mapred.LocalJobRunner: 
15/05/02 09:56:57 INFO mapred.MapTask: Starting flush of map output
15/05/02 09:56:57 INFO mapred.MapTask: Spilling map output
15/05/02 09:56:57 INFO mapred.MapTask: bufstart = 0; bufend = 2109573; bufvoid = 104857600
15/05/02 09:56:57 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 25406616(101626464); length = 807781/6553600
15/05/02 09:56:58 INFO mapred.LocalJobRunner: map > sort
15/05/02 09:56:58 INFO mapreduce.Job:  map 67% reduce 0%
15/05/02 09:56:59 INFO mapred.LocalJobRunner: Map task executor complete.
15/05/02 09:56:59 WARN mapred.LocalJobRunner: job_local1998350370_0001
java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: simpleWordExample$Reduce.<init>()
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:401)
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: simpleWordExample$Reduce.<init>()
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
    at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1619)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1603)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1452)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:693)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:761)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:338)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:233)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NoSuchMethodException: simpleWordExample$Reduce.<init>()
    at java.lang.Class.getConstructor0(Class.java:2706)
    at java.lang.Class.getDeclaredConstructor(Class.java:1985)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125)
    ... 13 more
15/05/02 09:57:00 INFO mapreduce.Job: Job job_local1998350370_0001 failed with state FAILED due to: NA
15/05/02 09:57:00 INFO mapreduce.Job: Counters: 21
    File System Counters
        FILE: Number of bytes read=1529039
        FILE: Number of bytes written=174506
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
    Map-Reduce Framework
        Map input records=30292
        Map output records=201946
        Map output bytes=2109573
        Map output materialized bytes=0
        Input split bytes=93
        Combine input records=0
        Combine output records=0
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=122
        CPU time spent (ms)=0
        Physical memory (bytes) snapshot=0
        Virtual memory (bytes) snapshot=0
        Total committed heap usage (bytes)=165613568
    File Input Format Counters 
        Bytes Read=1528889

感谢您的宝贵时间和帮助!

编辑全局:使用的新 api

【问题讨论】:

    标签: java sorting hadoop mapreduce bigdata


    【解决方案1】:

    我自己从未使用过hadoop,但看起来hadoop 正在尝试使用默认的无参数构造函数实例化一个“地图”实例。它抛出 NoSuchMethodException 因为它找不到无参数的构造函数。

    【讨论】:

    • 是的,但我使用的第一个 Mapper 也没有构造函数,它可以工作:/
    【解决方案2】:

    根据您的代码中的以下几行:

    -- Map 1:
    class Map1 extends MapReduceBase implements Mapper<Object, Text, IntWritable, Text> 
    
       -- From Driver
    conf2.setInputFormat(TextInputFormat.class);
    

    当您将输入格式设置为 TextInputFormat 时,Map Key 始终为 LongWritable 且值为 Text。您在 Map 类中正确使用了 TextInputFormat。

    【讨论】:

    • 这看起来很合乎逻辑,所以我做到了(正如您在编辑中看到的那样),Map1 现在与 Map 具有相同的结构,但我仍然拥有相同的控制台日志:/
    • 另一个问题是map1的输出和reducer1的输入不匹配。 Map1 表示 text, intwritable 而reduce1 以 intwritable & text 的顺序。
    • 完成并发布编辑。日志中没有变化:/当我在主要设置所有内容时出现错误,因为它显示“配置错误”?
    【解决方案3】:

    这可能是因为 API 的混合和匹配。 hadoop 有 2 个 API,较旧的是 mapred,最新的是 mapreduce。 在您的代码中,您正在导入它们。尝试评论

    import org.apache.hadoop.mapred.*;
    

    并导入新 API 所需的所有文件。希望这对你有用。

    评论后,尝试根据新的 API 编写代码。

    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
    } 
    
    
    public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context)
            throws IOException, InterruptedException {
    
        int sum = 0;
        for (IntWritable value:values) {
            sum += value.get();
        }
        context.write(key, new IntWritable(sum));
    
    }
    }
    

    我已经为你写了第一个 Mapper 和 reducer,你可以为第二个 mapper 和 reducer 做同样的事情。

    【讨论】:

    【解决方案4】:

    Map1Reduce1Reduce 类必须是static

    job2配置也有错误:

    job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class);

    你应该把它改成:

    job2.setOutputKeyClass(IntWritable.class); job2.setOutputValueClass(Text.class);

    我希望这会有所帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-23
      • 2013-01-11
      • 2017-06-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多