【问题标题】:Need help implementing this algorithm with map Hadoop MapReduce需要帮助使用 map Hadoop MapReduce 实现此算法
【发布时间】:2010-06-06 22:57:16
【问题描述】:

我的算法将通过一个大型数据集读取一些文本文件并在这些行中搜索特定术语。我已经用 Java 实现了它,但我不想发布代码,所以它看起来不像我正在寻找为我实现它的人,但确实我真的需要很多帮助!!!这不是我的项目计划的,但数据集结果是巨大的,所以老师告诉我必须这样做。

编辑(我没有澄清我以前的版本)我拥有的数据集在一个Hadoop集群上,我应该实现它的MapReduce

我正在阅读有关 MapReduce 的信息,并认为我首先执行标准实现,然后使用 mapreduce 更容易/更少。但没有发生,因为算法很愚蠢,没有什么特别的,而且 map reduce...我无法理解它。

这里是我算法的简短伪代码

LIST termList   (there is method that creates this list from lucene index)
FOLDER topFolder

INPUT topFolder
IF it is folder and not empty
    list files (there are 30 sub folders inside)
    FOR EACH sub folder
        GET file "CheckedFile.txt"
        analyze(CheckedFile)
    ENDFOR
END IF


Method ANALYZE(CheckedFile)

read CheckedFile
WHILE CheckedFile has next line
    GET line
    FOR(loops through termList)
            GET third word from line
          IF third word = term from list
        append whole line to string buffer
    ENDIF
ENDFOR
END WHILE
OUTPUT string buffer to file

另外,如您所见,每次调用“analyze”时,都必须创建新文件,我理解 map reduce 很难写入很多输出???

我理解 mapreduce 的直觉,我的示例似乎非常适合 mapreduce,但说到做到这一点,显然我知道的不够多,我被困住了!

请帮忙。

【问题讨论】:

    标签: java hadoop mapreduce


    【解决方案1】:

    您可以只使用一个空的 reducer,然后对您的作业进行分区,以便为每个文件运行一个映射器。每个映射器都会在您的输出文件夹中创建自己的输出文件。

    【讨论】:

    • 嗨!谢谢你的回答!!!但我不确定我是否理解:/你能给我更多信息吗?有没有这样的例子???
    【解决方案2】:

    Map Reduce 使用一些不错的 Java 6 并发特性很容易实现,尤其是 Future、Callable 和 ExecutorService。

    我创建了一个 Callable,它将按照您指定的方式分析文件

    public class FileAnalyser implements Callable<String> {
    
      private Scanner scanner;
      private List<String> termList;
    
      public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException {
        this.termList = termList;
        scanner = new Scanner(new File(filename));
      }
    
      @Override
      public String call() throws Exception {
        StringBuilder buffer = new StringBuilder();
        while (scanner.hasNextLine()) {
          String line = scanner.nextLine();
          String[] tokens = line.split(" ");
          if ((tokens.length >= 3) && (inTermList(tokens[2])))
            buffer.append(line);
        }
        return buffer.toString();
      }
    
      private boolean inTermList(String term) {
        return termList.contains(term);
      }
    }
    

    我们需要为找到的每个文件创建一个新的可调用文件,并将其提交给执行器服务。提交的结果是一个Future,我们稍后可以使用它来获取文件解析的结果。

    public class Analayser {
    
      private static final int THREAD_COUNT = 10;
    
      public static void main(String[] args) {
    
        //All callables will be submitted to this executor service
        //Play around with THREAD_COUNT for optimum performance
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
    
        //Store all futures in this list so we can refer to them easily
        List<Future<String>> futureList = new ArrayList<Future<String>>();
    
        //Some random term list, I don't know what you're using.
        List<String> termList = new ArrayList<String>();
        termList.add("terma");
        termList.add("termb");
    
        //For each file you find, create a new FileAnalyser callable and submit
        //this to the executor service. Add the future to the list
        //so we can check back on the result later
        for each filename in all files {
          try {
            Callable<String> worker = new FileAnalyser(filename, termList);
            Future<String> future = executor.submit(worker);
            futureList.add(future);
          }
          catch (FileNotFoundException fnfe) {
            //If the file doesn't exist at this point we can probably ignore,
            //but I'll leave that for you to decide.
            System.err.println("Unable to create future for " + filename);
            fnfe.printStackTrace(System.err);
          }
        }
    
        //You may want to wait at this point, until all threads have finished
        //You could maybe loop through each future until allDone() holds true
        //for each of them.
    
        //Loop over all finished futures and do something with the result
        //from each
        for (Future<String> current : futureList) {
          String result = current.get();
          //Do something with the result from this future
        }
      }
    }
    

    我这里的例子远非完整,也远非高效。我没有考虑样本量,如果它真的很大,你可以继续循环futureList,删除已经完成的元素,类似于:

    while (futureList.size() > 0) {
          for (Future<String> current : futureList) {
            if (current.isDone()) {
              String result = current.get();
              //Do something with result
              futureList.remove(current);
              break; //We have modified the list during iteration, best break out of for-loop
            }
          }
    }
    

    或者,您可以实现生产者-消费者类型设置,其中生产者将可调用对象提交给执行者服务并产生未来,消费者获取未来的结果并丢弃未来。

    这可能需要生产者和消费者本身是线程,以及用于添加/删除期货的同步列表。

    有什么问题欢迎提问。

    【讨论】:

    • 嗨!非常感谢您提出的解决方案!!很抱歉,尽管我尝试过,但我可能没有明确指定问题。我的错误,我只是在标题中提到了Hadoop,但是我的数据集在运行hadoop的集群上,所以我应该根据Hadoop MaPreduce框架实现它......我现在将编辑我的帖子。我正在分析的数据集是6GB :/ 并发处理太多了??????
    • 糟糕,我在这里是个菜鸟 :D 为了稍微赎回自己,我在 100 个文件上运行了我的代码,每个文件约 61MB,总共约 6GB。我不完全确定您的文件解析器做了什么,所以遗漏了血淋淋的细节,只是扫描了每一行并返回了一个空字符串。我知道有点做作。性能并不算太糟糕,线程池大小为 100,因此所有 100 个文件都被解析而没有被执行程序服务排队。我的 Atom 处理器的总运行时间为 17 分钟。抱歉,我无法正确回答您的问题。我没有使用 Hadoop 的经验,但在阅读了 SquareCog 的答案之后,这很有意义。
    • 嗨!非常感谢你,你帮了很多忙,因为我无法用我的大脑和时间来应付 hadoop MR。我将有更多类似的算法要实现,所以我必须以我能够做到的方式尝试它。无法在任何地方获得 hadoop 帮助:/ 所以我采用了你的代码,在我的 Intel 2Ghz 上,线程池 42 占用了解析结果并将结果输出到新文件大约需要 20 分钟,但只有 200Mb 数据(42 个文件)。同样,我必须对解析器进行一些修改,它必须进行一些更严格的匹配,而不是纯粹的“包含”术语,所以当我运行它时,我会让你知道结果:)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-07-12
    相关资源
    最近更新 更多