【发布时间】:2020-01-10 11:34:41
【问题描述】:
我有一个文件需要用来执行 wordcount 函数(基于 MapReduce),但使用线程,我将文件拆分为多个小文件,然后循环小文件以计算出现带有Reduce() 函数的单词,如何使用run() 函数实现线程以将它们与Reduce 函数一起使用。
这是我的代码:
public class WordCounter implements Runnable {
private String Nom;
protected static int Chunks = 1 ;
public WordCounter (String n) {
Nom = n;
}
public void split () throws IOException
{
File source = new File(this.Nom);
int maxRows = 100;
int i = 1;
try(Scanner sc = new Scanner(source)){
String line = null;
int lineNum = 1;
File splitFile = new File(this.Nom+i+".txt");
FileWriter myWriter = new FileWriter(splitFile);
while (sc.hasNextLine()) {
line = sc.nextLine();
if(lineNum > maxRows){
Chunks++;
myWriter.close();
lineNum = 1;
i++;
splitFile = new File(this.Nom+i+".txt");
myWriter = new FileWriter(splitFile);
}
myWriter.write(line+"\n");
lineNum++;
}
myWriter.close();
}
}
public void Reduce() throws IOException
{
ArrayList<String> words = new ArrayList<String>();
ArrayList<Integer> count = new ArrayList<Integer>();
for (int i = 1; i < Chunks; i++) {
//create the input stream (recevoir le texte)
FileInputStream fin = new FileInputStream(this.getNom()+i+".txt");
//go through the text with a scanner
Scanner sc = new Scanner(fin);
while (sc.hasNext()) {
//Get the next word
String nextString = sc.next();
//Determine if the string exists in words
if (words.contains(nextString)) {
int index = words.indexOf(nextString);
count.set(index, count.get(index)+1);
}
else {
words.add(nextString);
count.add(1);
}
}
sc.close();
fin.close();
}
// Creating a File object that represents the disk file.
FileWriter myWriter = new FileWriter(new File(this.getNom()+"Result.txt"));
for (int i = 0; i < words.size(); i++) {
myWriter.write(words.get(i)+ " : " +count.get(i) +"\n");
}
myWriter.close();
//delete the small files
deleteFiles();
}
public void deleteFiles()
{
File f= new File("");
for (int i = 1; i <= Chunks; i++) {
f = new File(this.getNom()+i+".txt");
f.delete();
}
}
}
【问题讨论】:
标签: java multithreading file