【问题标题】:java how to create thread pool for stream operationjava如何为流操作创建线程池
【发布时间】:2021-08-29 07:22:55
【问题描述】:

我喜欢在使用带有线程池的流时控制线程执行。 目前我有 字符串列表

List<String> mylist = new ArrayList() {"1","2","3","4"}; //that holds the strings 
List<Action> actions = new ArrayList<>{} holds function that manipulate the strings from mylist 

每个动作都有从 mylist 中获取字符串的工作方法

Stream<String> stream = mylist.parallelStream();
stream = stream.flatMap(s-> actions.stream().map(ac -> ac.work(str)));
r = stream.collect(Collectors.toList());

一切都很好,但我无法控制线程池,知道我可以像这样使用 ForkJoinPool Example
但我没有找到在我的示例中实现它的方法 例如,这不起作用:

   ForkJoinPool customThreadPool = new ForkJoinPool(4);
            r= customThreadPool.submit(
                    () -> mylist.parallelStream().flatMap(s-> actions.stream().map(ac -> ac.work(str))).collect(Collectors.toList()));

给我错误:

java: incompatible types: no instance(s) of type variable(s) T,R,A,capture#1 of ?,T exist so that java.util.concurrent.ForkJoinTask<T> conforms to java.util.List<java.lang.String>

谢谢

【问题讨论】:

  • 不起作用是什么意思?你就是这样做的。 parallelStream() 将使用来自customThreadPool 的线程,而不是来自ForkJoinPool.commonPool() 的线程,并且您可以完全控制customThreadPool
  • 语法中的某些东西已经磨损了,因为它给了我:')' 预期
  • 不要在评论中显示错误。编辑问题并通过说明“不工作”的含义来澄清它。
  • 当我添加时)它给了我:java:不兼容的类型:没有类型变量 T、R、A、capture#1 of ?,T 的实例存在,因此 java. util.concurrent.ForkJoinTask 符合 java.util.List
  • r的类型是什么?看起来这可能是问题所在;您可能需要添加最终的.get().join()

标签: java multithreading java-stream threadpool


【解决方案1】:

代码编译并运行良好,一旦代码错误得到修复 (str => s)。

公共池

// Setup with dummy actions for testing which thread executes the action
List<String> mylist = new ArrayList<>(Arrays.asList("1","2","3","4")); //that holds the strings 
List<Action> actions = new ArrayList<>(Arrays.asList(
        s -> { s += "x";  System.out.println(Thread.currentThread().getName() + ": " + s); return s; },
        s -> { s += "y";  System.out.println(Thread.currentThread().getName() + ": " + s); return s; }
        ));

// Using common pool
Stream<String> stream = mylist.parallelStream();
stream = stream.flatMap(s -> actions.stream().map(ac -> ac.work(s)));
List<String> r = stream.collect(Collectors.toList());
System.out.println(r);

输出

ForkJoinPool.commonPool-worker-7: 1x
ForkJoinPool.commonPool-worker-3: 2x
ForkJoinPool.commonPool-worker-3: 2y
main: 3x
ForkJoinPool.commonPool-worker-5: 4x
main: 3y
ForkJoinPool.commonPool-worker-7: 1y
ForkJoinPool.commonPool-worker-5: 4y
[1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]

自定义池

ForkJoinPool customThreadPool = new ForkJoinPool(4);
ForkJoinTask<List<String>> task = customThreadPool.submit(
        () -> mylist.parallelStream().flatMap(s -> actions.stream().map(ac -> ac.work(s))).collect(Collectors.toList()));
System.out.println(task.get());

如果编译器如问题中所述抱怨,您需要通过在第 3 行转换 lambda 表达式来帮助它选择正确的 submit() 重载:

        (Callable<List<String>>) () -> mylist.parallelStream().flatMap(s -> actions.stream().map(ac -> ac.work(s))).collect(Collectors.toList()));

输出

ForkJoinPool-1-worker-3: 3x
ForkJoinPool-1-worker-1: 1x
ForkJoinPool-1-worker-1: 1y
ForkJoinPool-1-worker-5: 2x
ForkJoinPool-1-worker-7: 4x
ForkJoinPool-1-worker-7: 4y
ForkJoinPool-1-worker-5: 2y
ForkJoinPool-1-worker-3: 3y
[1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]

单线程

Stream<String> stream = mylist.stream();
stream = stream.flatMap(s -> actions.stream().map(ac -> ac.work(s)));
List<String> r = stream.collect(Collectors.toList());
System.out.println(r);

输出

main: 1x
main: 1y
main: 2x
main: 2y
main: 3x
main: 3y
main: 4x
main: 4y
[1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]

【讨论】:

  • 谢谢,很好的回答,您建议在哪里了解有关流的更多信息?
猜你喜欢
  • 1970-01-01
  • 2014-12-21
  • 2015-03-28
  • 2012-04-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多