【问题标题】:Increasing Disk Read Throughput By Concurrency通过并发增加磁盘读取吞吐量
【发布时间】:2017-09-08 08:21:49
【问题描述】:

我正在尝试读取一个日志文件并解析它,它只消耗 CPU。我有一个服务器,它以 230MB/秒的速度读取一个巨大的文本文件,只读取文本文件而不解析。当我尝试使用单线程解析文本文件时,我可以解析文件大约 50-70MB/秒。

我想增加我的吞吐量,同时完成这项工作。在这段代码中,我达到了 130 MB/秒。在高峰期,我看到了 190MB/秒。我尝试了 BlockedQueue、Semaphore、ExecutionService 等。你有什么建议可以让我达到 200MB/秒的吞吐量。

public static void fileReaderTestUsingSemaphore(String[] args) throws Exception {

    CustomFileReader reader = new CustomFileReader(args[0]);
    final int concurrency = Integer.parseInt(args[1]);
    ExecutorService executorService = Executors.newFixedThreadPool(concurrency);
    Semaphore semaphore = new Semaphore(concurrency,true);
    System.out.println("Conccurrency in Semaphore: " + concurrency);


    String line;

    while ((line = reader.getLine()) != null)
    {
        semaphore.acquire();

        try
        {

            final String p = line;

            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    reader.splitNginxLinewithIntern(p); // that is the method which parser string and convert to class.
                    semaphore.release();
                }
            });
        }

        catch (Exception ex)
        {
            ex.printStackTrace();
        }

        finally {
            semaphore.release();
        }
    }

    executorService.shutdown();
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);

    System.out.println("ReadByteCount: " + reader.getReadByteCount());
}

【问题讨论】:

    标签: java concurrency java.util.concurrent


    【解决方案1】:

    您可能会从 Java 8 中引入的 Files.lines() 方法和 Stream 范例中受益。它将使用系统通用的 fork/join 池。试试这个模式:

    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    
    public class LineCounter
    {
        public static void main(String[] args) throws IOException
        {
            Files.lines(Paths.get("/your/file/here"))
                 .parallel()
                 .forEach(LineCounter::processLine);
        }
    
        private static void processLine(String line) {
            // do the processing
        }
    }
    

    【讨论】:

    • 我的笔记本电脑(2015 年中期的 MacBook Pro)将从冷启动开始以这种模式在一秒钟内处理一个 450 MB 的文件...
    • 感谢您提供此代码。这段代码是目前最好的。我的解析时间减少到 130 秒到 40 秒。我希望我可以在我的代码中使用这个解决方案。 @Per Huss :)
    • 我很高兴听到!祝你的项目好运!
    【解决方案2】:

    假设您不关心行的顺序:

        final String MARKER = new String(""); 
        BlockingQueue<String> q = new LinkedBlockingDeque<>(1024);
        for (int i = 0; i < concurrency; i++)
            executorService.execute(() -> {
                for (;;) {
                    try {
                        String s = q.take();
                        if(s == MARKER) {
                            q.put(s);
                            return;
                        }
                        reader.splitNginxLinewithIntern(s);
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            });
        String line;
        while ((line = reader.readLine()) != null) {
            q.put(line);
        }
        q.put(MARKER);
        executorService.awaitTermination(10, TimeUnit.MINUTES);
    

    这会启动多个线程,每个线程运行一个特定的任务;该任务是从队列中读取并运行 split 方法。阅读器只是提供队列,通知队列完成并等待终止。

    如果你要使用 RxJava2 和 rxjava2-extras 那就简单了

        Strings.from(reader)
               .flatMap(str -> Flowable
                  .just(str)
                  .observeOn(Schedulers.computation())
                  .doOnNext(reader::splitNginxLinewithIntern)
               )
               .blockingSubscribe();
    

    【讨论】:

    • 感谢您的代码。我测试了你的代码,结果是一样的。我使用 ArrayBlockingQueue 尝试了这种代码。我想,我最好使用提到 Ralf 的分块字符串数组 solitun。 @塔索斯
    【解决方案3】:

    您需要使用多线程,并且需要让阅读器线程将解析委托给工作线程,这很清楚。关键是如何以尽可能少的开销完成此委托。

    @Tassos 提供的代码看起来像是一个可靠的改进。

    您可以尝试的另一件事是更改委派粒度,而不是单独委派每一行,而是构建块,例如100 行,从而将委派/同步开销减少了 100 倍(但随后需要一个 String[] 数组或类似的数组,这应该不会造成太大伤害)。

    【讨论】:

    • 感谢您的留言。我测试了它。它改进了很多,就像我的解析时间减少了 130 秒到 70 秒。但是 Per Huss 解决方案要好得多,它减少了 40 秒。 @Ralf Kleberhoff
    猜你喜欢
    • 2017-09-06
    • 2020-09-17
    • 2019-06-14
    • 1970-01-01
    • 2013-07-05
    • 2015-10-25
    • 2020-10-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多