【问题标题】:Produce Lists with Parallel Stream使用并行流生成列表
【发布时间】:2021-04-11 09:05:35
【问题描述】:

我有一个 Json 字符串列表,其中包含 电影 列表。我需要收集这些电影,处理它们并将它们存储在磁盘中。我正在考虑使用并行流方法来收集电影并测试其性能。我的做法是这样的:

以下方法生成电影列表

protected abstract List<T> parseJsonString(JsonIterator iter);

此方法包含一个并行流,该流收集流中生成的所有列表的列表List&lt;List&lt;Movies)):

public CompletableFuture<List<List<T>>> parseJsonPages(List<CompletableFuture<String>> jsonPageList)
{
    return jsonPageList.parallelStream()
            .map( jsonPageStr -> CompletableFuture.supplyAsync( () -> {
                try {
                    return parseJsonString(JsonIterator.parse( jsonPageStr.get() ) );
                }
                catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
                return null;
            } ) )
            .collect( ParallelCollectors.toFuture( Collectors.toList() ) );
}

这种方法的问题在于流将生成电影列表,然后将所有列表附加到列表中。你认为这是收集所有这些电影的有效方式吗?我是否应该将所有列表中的电影合并到一个列表中,而不是仅将整个列表附加到列表中(即使这也需要一些时间)。如果是这样,我该如何执行这样的任务?

提前致谢。

【问题讨论】:

  • @EdwinDalorzo 你发给我的文章说:“考虑使用 S.parallelStream().operation(F) 而不是 S.stream().operation(F) 当操作是独立的,或者在计算上昂贵或适用于有效可拆分数据结构的许多元素,或两者兼而有之”。解析这些字符串中的每一个显然是独立的。此外,仅从字符串解析 json 对象需要大量计算时间。我有 1000 多个 Json 字符串。另外,我正在尝试测试这种方法的理论性能,而不仅仅是数秒...

标签: java list merge parallel-processing stream


【解决方案1】:

织机项目

将来,当Project Loom 带着它的虚拟线程到达时,只需将每个任务分配给一个虚拟线程,它就会变得更加简单,并且可能执行得更快。

Project Loom 的Preliminary builds 现已推出,基于早期访问的 Java 16。虽然可能会发生变化,并且尚未准备好投入生产,但如果这是一个非关键任务的个人项目,您可以考虑现在使用它.

顺便说一句,您的 Movie 类可能适合定义为 record,这是 Java 16 中的功能之一。

List< String > inputListsOfMoviesAsJson = … ;  // Input.
Set< Movie > movies = Set.of() ;  // Output. Default to unmodifiable empty `Set`. 
try 
(
    ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
)
{
    movies = Collections.synchronizedSet( new HashSet< Movie > ) ;
    for( String inputJson : inputListsOfMoviesAsJson )
    {
        Runnable task = () -> movies.addAll( this.parseJsonIntoSetOfMovies( inputJson ) ) ;
        executorService.submit( task ) ;
    }
}
// At this point, flow-of-control blocks until all tasks are done.
// Then the executor service is automatically shutdown as part of being closed, as an `AutoCloseable` in a try-with-resources.
… use your `Set` of `Movie` objects. 

如果要跟踪成功/失败,则捕获并收集每次调用 executorService.submit( task ) 返回的 Future 对象。为简单起见,上面的代码忽略了该返回值。

关于您关于累积列表的问题,结果Movie 对象与稍后合并,我认为收集这些对象不会成为瓶颈。我的猜测是处理 JSON 将成为瓶颈。无论哪种方式,在使用 Project Loom 时,使用更简单的编码可能更容易使用分析器工具来验证您的实际瓶颈。

在上面的代码中,我使用 Set 通过调用 Collections.synchronized… 使线程安全。您可以尝试SetList 的各种实现。列表可能更快,但如果这是您的数据输入中的问题,则集合具有消除重复项的好处。

注意事项

内存

这种方法假设您有足够的内存来处理所有 JSON 工作。使用虚拟线程,所有这些输入可能几乎同时得到处理。

  • 在 Project Loom 中,一个被阻塞的虚拟线程被“停放”,移到一边让另一个线程运行。因此,您可以运行许多虚拟线程,甚至数百万。
  • 对于传统的平台/内核线程,阻塞的线程不会为另一个线程开始工作让路。所以你一次运行的线程很少。

因此,如果内存是一种受限资源,则需要采取进一步措施来防止过多的虚拟线程启动 JSON 处理。

CPU 密集型任务

虚拟线程(纤程)适用于涉及阻塞代码的工作。对于纯 CPU 密集型任务(例如视频编码),传统的平台/内核线程是最好的。如果您只处理已加载到内存中的 JSON 文本,那么如果虚拟线程受 CPU 限制,则它们可能不会显示任何好处。但我会试一试,因为试运行很容易。如果您正在执行任何 I/O(记录、访问文件、访问数据库、进行网络调用),那么您肯定会看到虚拟线程的显着性能提升。

相关代码必须是线程安全的

确保您的 JSON 处理库构建为线程安全的。

并确保您的 parseJsonIntoSetOfMovies 方法是线程安全的。

推荐阅读

阅读 Brian Goetz 等人的Java Concurrency In Practice一书。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-04-23
    • 1970-01-01
    • 1970-01-01
    • 2018-07-24
    • 2023-01-26
    • 2014-01-14
    相关资源
    最近更新 更多