【问题标题】:Java 8 parallel stream of list of method paramJava 8 方法参数列表的并行流
【发布时间】:2019-08-01 23:55:28
【问题描述】:

我有一个方法:

invokList(List<Object> list);

这个方法在一个 jar 里面,我无法访问它的源代码。所以为此,我需要以并行方式执行 invokList,有人可以帮忙吗?

思路是将列表拆分为多个列表,并行执行invokList。

我做了这个例子:

            import java.util.Arrays;
            import java.util.Collections;
            import java.util.List;

            public class Test {

                public static void main(String[] args) {
                    List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20);
                    list.parallelStream()
                            .map(Collections::singletonList)
                            .forEach(Test::invokList);
                }

                public static void invokList(List<Integer> list) {
                    try {
                        Thread.sleep(100);
                        System.out.println("The Thread :" + Thread.currentThread().getName() + " is processing this list" + list);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

【问题讨论】:

  • 你没有问一个真正的问题。
  • 如果你执行上面的测试,结果是一个一个地处理元素我想要批处理之类的东西
  • 不要在评论中写这个,把它添加到你的问题中。请注意,说出您的愿望仍然是事实陈述,而不是问题。见How to Ask

标签: java java-8 parallel-processing java-stream


【解决方案1】:

Guava 具有 Lists.partitionIterables.partition 方法,它们可以执行您所要求的操作。假设您有一个很大的 List 并希望以 5 个为一组进行处理,您可以这样做:

int batchSize = 5;
Lists.partition(list, batchSize)
   .parallelStream()
   .forEach(batch -> invokeList(batch));

【讨论】:

    【解决方案2】:

    看起来很冗长,但是你可以试试下面的。 runAsync() 方法将使列表块并行运行。

    private void test(List<Object> list, int chunkSize) throws ExecutionException, InterruptedException {
        AtomicInteger prev = new AtomicInteger(0);
        List<CompletableFuture> futures = new ArrayList<>();
        IntStream.range(1, (int) (chunkSize * (Math.ceil(Math.abs(list.size() / (double) chunkSize)))))
                .filter(i -> i % chunkSize == 0 || i == list.size())
                .forEach(i -> {
                    List<Object> chunk = list.subList(prev.get(), i);
                    futures.add(CompletableFuture.runAsync(() -> invokeList(chunk)));
                    prev.set(i);
                });
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
    }
    
    private void invokeList(List<Object> list) {
        System.out.println("Invoked for: " + list);
    }
    

    我运行了一个 30 个整数的列表,块大小为 5,如下所示:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<Object> list = IntStream.range(0, 30).mapToObj(i1 -> (Object) String.valueOf(i1)).collect(Collectors.toList());
        int chunkSize = 5;
        new Test().test(list, chunkSize);
    }
    

    输出

    Invoked for: [15, 16, 17, 18, 19]
    Invoked for: [0, 1, 2, 3, 4]
    Invoked for: [5, 6, 7, 8, 9]
    Invoked for: [10, 11, 12, 13, 14]
    Invoked for: [20, 21, 22, 23, 24]
    

    【讨论】:

    【解决方案3】:

    如果你不想引入像 Guava 这样的额外依赖项,那么你可以编写一个收集器,将你的列表分成块:

    static <T> Collector<T, List<List<T>>, List<List<T>>> toChunks(int size) {
        return Collector.of(ArrayList::new, (list, value) -> {
            List<T> chunk = list.isEmpty() ? null : list.get(list.size() - 1);
            if (chunk == null || chunk.size() == size) {
                chunk = new ArrayList<>(size);
                list.add(chunk);
            }
            chunk.add(value);
        }, (list1, list2) -> {
            throw new UnsupportedOperationException();
        });
    }
    

    然后调用如下:

     List<Integer> list = Arrays.asList(1,26,17,18,19,20);
     list.stream().collect(toChunks(5))
                  .parallelStream()
                  .forEach(System.out::println);
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-09-11
      • 1970-01-01
      • 1970-01-01
      • 2019-09-25
      • 1970-01-01
      • 2016-08-27
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多