【问题标题】:Sequentially processing file in threadpool executor在线程池执行程序中顺序处理文件
【发布时间】:2013-09-05 17:23:41
【问题描述】:

我们使用 JDK 7 watchservice 来监视可以包含 xml 或 csv 文件的目录。这些文件被放入线程池中,然后被处理并推送到数据库中。这个应用程序将永远运行并监视目录并在可用时保持处理文件。 XML 文件很小,不需要时间,但是每个 csv 文件可以包含超过 80,000 条记录,因此处理需要时间才能放入数据库。当从线程池处理 15 个 csv 文件时,Java 应用程序会出现内存不足错误。当csv文件进入线程池时,有什么方法可以串行处理,即一次只能处理一个。

【问题讨论】:

  • Executors#newSingleThreadExecutor()怎么样?
  • 确实需要为 csv 文件创建单独的执行程序...... newSinglethreadExecutor() 似乎做得很好......任何例子都会很棒..

标签: java multithreading concurrency threadpool watchservice


【解决方案1】:

当从线程池处理 15 个 csv 文件时,Java 应用程序会出现内存不足错误。当csv文件进入线程池时,有什么方法可以串行处理,即一次只能处理一个。

如果我理解,如果您超过某个阈值,您希望停止添加到池中。有一种简单的方法可以做到这一点,即使用阻塞队列和被拒绝的执行处理程序。

请看以下答案:

Process Large File for HTTP Calls in Java

总而言之,您可以执行以下操作:

// only allow 100 jobs to queue
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100);
ThreadPoolExecutor threadPool =
    new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, queue);
// we need our RejectedExecutionHandler to block if the queue is full
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
       @Override
       public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
           try {
                // this will block the producer until there's room in the queue
                executor.getQueue().put(r);
           } catch (InterruptedException e) {
                throw new RejectedExecutionException(
                   "Unexpected InterruptedException", e);
           }
    }
});

这意味着它将阻止添加到队列中并且不应该耗尽内存。

【讨论】:

    【解决方案2】:

    我会采取不同的方法来解决您的问题,我想您一切正常,除非您开始将太多数据读入内存。

    不确定您如何阅读 csv 文件,建议使用 LineReader 并阅读例如500 行处理它们,然后读取接下来的 500 行,所有大文件都应该以这种方式处理,因为无论你增加多少内存参数,一旦你有更大的文件要处理,你就会内存不足,所以使用可以批量处理记录的实现。这将需要一些额外的编码工作,但无论您必须处理多大的文件,都不会失败。

    干杯!!

    【讨论】:

      【解决方案3】:

      你可以试试:

      1. 使用-XmxJVM选项增加JVM内存
      2. 使用不同的执行程序来减少一次处理的文件数量。一个彻底的解决方案是使用SingleThreadExecutor

        public class FileProcessor implements Runnable {
            public FileProcessor(String name) { }
            public void run() {
                // process file
            }
        }
        
        // ...
        ExecutorService executor = Executors.newSingleThreadExecutor();
        // ...
        
        public void onNewFile(String fileName) {
            executor.submit(new FileProcessor(fileName));
        }
        

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-10-27
        • 1970-01-01
        • 2011-11-03
        • 1970-01-01
        • 2012-02-20
        • 2017-01-02
        相关资源
        最近更新 更多