Map Reduce 使用一些不错的 Java 6 并发特性很容易实现,尤其是 Future、Callable 和 ExecutorService。
我创建了一个 Callable,它将按照您指定的方式分析文件
public class FileAnalyser implements Callable<String> {
private Scanner scanner;
private List<String> termList;
public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException {
this.termList = termList;
scanner = new Scanner(new File(filename));
}
@Override
public String call() throws Exception {
StringBuilder buffer = new StringBuilder();
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
String[] tokens = line.split(" ");
if ((tokens.length >= 3) && (inTermList(tokens[2])))
buffer.append(line);
}
return buffer.toString();
}
private boolean inTermList(String term) {
return termList.contains(term);
}
}
我们需要为找到的每个文件创建一个新的可调用文件,并将其提交给执行器服务。提交的结果是一个Future,我们稍后可以使用它来获取文件解析的结果。
public class Analayser {
private static final int THREAD_COUNT = 10;
public static void main(String[] args) {
//All callables will be submitted to this executor service
//Play around with THREAD_COUNT for optimum performance
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
//Store all futures in this list so we can refer to them easily
List<Future<String>> futureList = new ArrayList<Future<String>>();
//Some random term list, I don't know what you're using.
List<String> termList = new ArrayList<String>();
termList.add("terma");
termList.add("termb");
//For each file you find, create a new FileAnalyser callable and submit
//this to the executor service. Add the future to the list
//so we can check back on the result later
for each filename in all files {
try {
Callable<String> worker = new FileAnalyser(filename, termList);
Future<String> future = executor.submit(worker);
futureList.add(future);
}
catch (FileNotFoundException fnfe) {
//If the file doesn't exist at this point we can probably ignore,
//but I'll leave that for you to decide.
System.err.println("Unable to create future for " + filename);
fnfe.printStackTrace(System.err);
}
}
//You may want to wait at this point, until all threads have finished
//You could maybe loop through each future until allDone() holds true
//for each of them.
//Loop over all finished futures and do something with the result
//from each
for (Future<String> current : futureList) {
String result = current.get();
//Do something with the result from this future
}
}
}
我这里的例子远非完整,也远非高效。我没有考虑样本量,如果它真的很大,你可以继续循环futureList,删除已经完成的元素,类似于:
while (futureList.size() > 0) {
for (Future<String> current : futureList) {
if (current.isDone()) {
String result = current.get();
//Do something with result
futureList.remove(current);
break; //We have modified the list during iteration, best break out of for-loop
}
}
}
或者,您可以实现生产者-消费者类型设置,其中生产者将可调用对象提交给执行者服务并产生未来,消费者获取未来的结果并丢弃未来。
这可能需要生产者和消费者本身是线程,以及用于添加/删除期货的同步列表。
有什么问题欢迎提问。