【问题标题】:Use threading to process multiple files使用线程处理多个文件
【发布时间】:2020-01-10 11:34:41
【问题描述】:

我有一个文件需要用来执行 wordcount 函数(基于 MapReduce),但使用线程,我将文件拆分为多个小文件,然后循环小文件以计算出现带有Reduce() 函数的单词,如何使用run() 函数实现线程以将它们与Reduce 函数一起使用。

这是我的代码:

public class WordCounter implements Runnable {

private String Nom;
    protected static int Chunks =  1 ;
    public WordCounter (String n) {
        Nom = n;
    }

   public void split () throws IOException
    {

    File source = new File(this.Nom);
    int maxRows = 100;
    int i = 1;

        try(Scanner sc = new Scanner(source)){
            String line = null;
            int lineNum = 1;

            File splitFile = new File(this.Nom+i+".txt");

            FileWriter myWriter = new FileWriter(splitFile);

            while (sc.hasNextLine()) {
            line = sc.nextLine();

                if(lineNum > maxRows){
                    Chunks++;
                    myWriter.close();
                    lineNum = 1;
                    i++;
                    splitFile = new File(this.Nom+i+".txt");
                    myWriter = new FileWriter(splitFile);
                }

                myWriter.write(line+"\n");
                lineNum++;
            }

            myWriter.close();

        }

}
public void Reduce() throws IOException 
    {

        ArrayList<String> words = new ArrayList<String>();
        ArrayList<Integer> count = new ArrayList<Integer>(); 

            for (int i = 1; i < Chunks; i++) {

            //create the input stream (recevoir le texte)
            FileInputStream fin = new FileInputStream(this.getNom()+i+".txt");
            //go through the text with a scanner
            Scanner sc = new Scanner(fin);

            while (sc.hasNext()) {
                //Get the next word
                String nextString = sc.next();

                //Determine if the string exists in words
                if (words.contains(nextString)) {
                    int index = words.indexOf(nextString);

                    count.set(index, count.get(index)+1);

                }
                else {
                    words.add(nextString);
                    count.add(1);
                }
            }
                sc.close();
                fin.close();
            }

            // Creating a File object that represents the disk file. 
            FileWriter myWriter = new FileWriter(new File(this.getNom()+"Result.txt"));
            for (int i = 0; i < words.size(); i++) {
                myWriter.write(words.get(i)+ " : " +count.get(i) +"\n");    
            }
            myWriter.close();

            //delete the small files
            deleteFiles();
    }
      public void deleteFiles()
    {
        File f= new File("");
        for (int i = 1; i <= Chunks; i++) {
            f = new File(this.getNom()+i+".txt");
            f.delete();
        }
    }

}

【问题讨论】:

    标签: java multithreading file


    【解决方案1】:

    最好使用Callable 而不是Runnable 接口,这样您就可以检索数据。

    因此,为了修复您的代码,您或多或少可以执行以下操作:

    public class WordCounter {
           private static ExecutorService threadPool = Executors.newFixedThreadPool(5);  // 5 represents the number of concurrent threads.
    
           public Map<String, Integer> count(String filename) {
              int chunks = splitFileInChunks(filename);
              List<Future<Report>> reports = new ArrayList<Future<Report>>();
    
              for (int i=1; i<=chunks; i++) {
                 Callable<Report> callable = new ReduceCallable(filename + i + ".txt");
                 Future<Report> future = threadPool.submit(callable);
                 reports.add(future);
              }
    
              Map<String, Integer> finalMap = new HashMap<>();
              for (Future<Report> future : reports) {
                  Map<String, Integer>  map = future.get().getWords();
                  for (Map.Entry<String, Integer> entry : map.entrySet()) {
                      int oldCnt = finalMap.get(entry.getKey()) != null ? finalMap.get(entry.getKey()) : 0;
                      finalMap.put(entry.getKey(), entry.getValue() + oldCnt);
                  }
              }
              //  return a map with the key being the word and the value the counter for that word
              return finalMap; 
           }
    
           // this method doesn't need to be run on the separate thread
           private int splitFileInChunks(String filename) throws IOException { .... }
        }
    
        public class Report {
               Map<String, Integer> words = new HashMap<>();
               // ... getter, setter, constructor etc
        }
    
        public class ReduceCounter implements Callable<Report> { 
            String filename;
            public ReduceCounter(String filename) { this.filename = filename;}
    
             public Report call() {
                //  store the values in a Map<String, Integer> since it's easier that way
                Map<String, Integer>  myWordsMap = new HashMap<String, Integer>;
                // here add the logic from your Reduce method, without the for loop iteration
                // you should add logic to read only the file named with the value from "filename" 
    
                return new Report(myWordsMap);
             }
        }
    

    请注意,您可以跳过 Report 类并返回 Future&lt;Map&lt;String,Integer&gt;&gt;,但我使用 Report 是为了更容易理解。

    更新用户请求的 Runnable

    public class WordCounter {
             public Map<String, Integer> count(String filename) throws InterruptedException {
               int chunks = splitFileInChunks(filename);
               List<ReduceCounter> counters = new ArrayList<>(); 
               List<Thread> reducerThreads = new ArrayList<>(); 
    
              for (int i=1; i<=chunks; i++) {
                 ReduceCounter  rc = new ReduceCounter(filename + i + ".txt");
                 Thread t = new Thread(rc); 
                 counters.add(rc);
                 reducerThreads.add(t);
                 t.start();
              }
              // next wait for the threads to finish processing
              for (Thread t : reducerThreads) {
                    t.join();
              }
              // now grab the results from each of them
              for (ReduceCounter cnt : counters ) {
                   cnt.getWords();
                   // next just merge the results here...
              }
    }
    

    Reducer 类应该如下所示:

    public class ReduceCounter implements Runnable { 
            String filename;
            Map<String, Integer> words = new HashMap();
            public ReduceCounter(String filename) { this.filename = filename;}
    
             public void run() {
                //  store the values in the "words" map
                // here add the logic from your Reduce method, without the for loop iteration
                // also read, only the file named with the value from "filename" 
    
             }
            public Map<String, Integer> getWords() {return words;}
        }
    

    【讨论】:

    • 非常感谢我的朋友,尽管我的教授坚持我们应该使用 Runnable 接口。
    • @AnasCHERIET 我已经用 Runnable 解决方案更新了答案。或多或少这就是您启动线程并从中获取值的方式。如果对您有好处,请将答案标记为有用或已接受。
    【解决方案2】:

    我找到了一个解决方案,因为我为每个小文件分配了一个线程,然后我在 run() 函数中调用了 Reduce() 函数,但我仍然没有完全理解它,这是代码:

    public void Reduce() throws IOException 
        {
    
            ArrayList<String> words = new ArrayList<String>();
            ArrayList<Integer> count = new ArrayList<Integer>(); 
            Thread TT= new Thread();
                for (int i = 1; i < Chunks; i++) {
    
                //create the input stream (recevoir le texte)
                FileInputStream fin = new FileInputStream(this.getNom()+i+".txt");
    
                TT=new Thread(this.getNom()+i+".txt");
                TT.start();
    
                //go through the text with a scanner
                Scanner sc = new Scanner(fin);
    
                while (sc.hasNext()) {
                    //Get the next word
                    String nextString = sc.next();
    
                    //Determine if the string exists in words
                    if (words.contains(nextString)) {
                        int index = words.indexOf(nextString);
    
                        count.set(index, count.get(index)+1);
    
                    }
                    else {
                        words.add(nextString);
                        count.add(1);
                    }
                }
                    sc.close();
                    fin.close();
                }
    
                // Creating a File object that represents the disk file. 
                FileWriter myWriter = new FileWriter(new File(this.getNom()+"Result.txt"));
                for (int i = 0; i < words.size(); i++) {
                    myWriter.write(words.get(i)+ " : " +count.get(i) +"\n");    
                }
                myWriter.close();
    
                //Store the result in the new file
                deleteFiles();
        }
    public void run() {
    
            try {
                this.Reduce();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    public static void main(String[] args) throws IOException {
            Wordcounter w1 = new Wordcounter("Words.txt");
            Thread T1= new Thread(w1);
            T1.start();
    }
    

    【讨论】:

    • 这不起作用。它仍然是单线程。您正在启动的线程不执行任何操作。检查我更新的答案。为了让另一个线程做某事,您需要将一个可运行对象传递给它。另请阅读:geeksforgeeks.org/runnable-interface-in-java
    • 非常感谢,这正是我正在寻找的,我会尝试在我的代码中实现它
    猜你喜欢
    • 1970-01-01
    • 2013-09-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-19
    • 2015-12-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多