【问题标题】:How to feed an infinite stream using a queue?如何使用队列提供无限流?
【发布时间】:2015-04-22 21:46:15
【问题描述】:

我想要几个threads 来读取文件。这些文件是 ZIP 文件,其中包含几个文本文件本身。因此必须逐行读取每个文件。

文件的任何内容都应该发送到某种queue。队列本身应该由工作线程无限处理。

如果可能的话,如何实现这样的场景? 我想出了一些伪代码,但我真的不知道如何实现:

Queue<String> queue;

//multiple threads:
BufferedReader br;
queue.add(br.readLine());

//processing thread for the queue:
queue.stream().parallel().forEach(line -> convertAndWrite(line));

//worker function:
private void convertAndWrite(String line) {
    //convert the line to an output format,
    //and write each line eg to an output file or perist in DB, whatever
}

【问题讨论】:

  • 与其让线程读取文件,不如将整个事情作为一个大的并行流来完成几乎肯定会更简单——Stream.of(files).parallel().flatMap(Files::lines).forEach(line -&gt; line.convertAndWrite(line))
  • @LouisWasserman 不幸的是你不能那样做,因为Files.lines() throws IOException;另外,我不确定在这种情况下是否调用了它的.close()方法
  • @fge:是的,关闭将正常工作,但您可能必须处理 IOException。不过,我希望基于这种方法的东西比尝试像这样流式处理队列更容易。
  • 但是当你并行写行时,你可能不会按照你收到的顺序得到它们。我希望它不会损坏 .zip 文件。

标签: java java-8 java-stream


【解决方案1】:

查看How to interconect non-paralel stream with parallel stream(one producer multiple consumers) 的答案。对于此问题,使用无法并行化的流填充了阻塞队列。实现了一个可并行化的拆分器,该拆分器耗尽了这个队列。如果您希望文件是连续的,那么您可能只有一个阅读器正在填充队列。

然后,您可以使用 StreamSupport 从拆分器创建一个流。阻塞队列支持并发修改,因此拆分器实现可以并行化,因此您的流可以并行化。如果您的下游编写器是可并行化的,那么您的整个消费者端都可以并行化。

如果您的读者遇到异常,则您将 End-of-Stream 标记(可能作为最终子句的一部分)推入 BlockingQueue 并重新抛出。只有一个 tryAdvance 调用者(请参阅 AbstractSpliterator),因此一个 End-of-Stream 标记足以终止所有并行流。

【讨论】:

    【解决方案2】:

    这里有一些伪代码,如果您可以使用 LinkedList LinkedBlockingQueue 而不是队列(虽然技术上 LinkedList LinkedBlockingQueue 实现了队列):

    while (true) {
        if (!(queue.isEmpty())) {
            try {
                TheTypeOfElementsInQueue element = queue.removeFirst();
                // do something with element
            } catch (NoSuchElementException e) {
                // just in case!
            }
        }
    }
    

    我将它放入实现可运行的类的run() 中,并从中创建了一个线程。我的做法是使用 LinkedList,但由于 LinkedList 不是线程安全的,也许 LinkedBlockingQueue 会更好。

    【讨论】:

    • LinkedLists 不是线程安全的。 LinkedBlockingQueue 会起作用。
    • @brettOkken 谢谢!我已经编辑了答案以包含您的建议。
    【解决方案3】:

    为了实现监视文件夹中新文件外观的任务,我将使用 Java WatchServicethis article 中所示的流

    通过WatchService注册文件夹更新:

    Path path = Paths.get(".");
    WatchService watchService =  path.getFileSystem().newWatchService();
    path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
    

    获取文件夹更新并通过流处理它们:

    WatchKey watchKey = null;
    while (true) {
        watchKey = watchService.poll(10, TimeUnit.MINUTES);
        if(watchKey != null) {
            watchKey.pollEvents().stream().forEach(event -> System.out.println(event.context()));
        }
        watchKey.reset();
    }
    

    在一行

    watchKey.pollEvents().stream().forEach(event -> System.out.println(event.context()));
    

    我想你可以使用并行流,event.context() 这里是一个Path 实例,用于新创建的文件。因此您可以继续通过其他流操作(如map 等)处理其内容

    【讨论】:

      猜你喜欢
      • 2018-03-06
      • 1970-01-01
      • 2014-02-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多