【问题标题】:Java Executor for "main" thread“主”线程的 Java 执行器
【发布时间】:2019-12-13 10:28:35
【问题描述】:

我想从程序的“主”线程(类似于 Android 中的主 looper)创建一个 Executor,然后运行直到它处理完所有提交给它的内容:

public class MyApp {
  private static Callable<Integer> task = () -> {
    // ... return an int somehow ...
  };

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
    Thread main = Thread.currentThread();
    ExecutorService executorService = Executors.newSingleThreadExecutor(r -> main);

    service.submit(task).addListener(() -> {
      /// ... do something with the result ...
    }, executorService);

    executorService.awaitTermination(100, TimeUnit.SECONDS);
  }
}

但我得到一个IllegalThreadState 异常:

SEVERE: RuntimeException while executing runnable MyApp$$Lambda$20/0x00000008000a6440@71f06a3c with executor java.util.concurrent.Executors$FinalizableDelegatedExecutorService@47add263
java.lang.IllegalThreadStateException
    at java.base/java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:926)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1343)
    at java.base/java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:687)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1137)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:957)
    at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:726)
    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.afterRanInterruptibly(TrustedListenableFutureTask.java:131)
    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:133)
    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

我可以在新线程上启动ExecutorService,然后等待,但这似乎很浪费。

有没有好办法从当前线程创建一个Executor,并等待它处理所有提交给它的东西?

【问题讨论】:

    标签: java guava


    【解决方案1】:

    方法invokeAll()记录在here等待所有任务完成
    你可以这样做

    executorService.invokeAll();
    

    关注this post了解更多说明

    【讨论】:

      【解决方案2】:

      您正试图在主线程中执行回调。直接的方法是调用get()

      YourResult result = service.submit(task).get();
      /* Do something with your result... */
      

      如果您想异步执行回调,您可能对CompletionStage 感兴趣

      CompletableFuture.runAsync(task).thenAccept(r -> /* do something with result */);
      

      【讨论】:

        【解决方案3】:

        您可能正在寻找类似CompletionService 的东西:您向此提交任务,它会按完成顺序将它们返回给您。

        ExecutorService service = Executors.newFixedThreadPool(1);
        ExecutorCompletionService<Integer> comp = new ExecutorCompletionService<>(service);
        
        comp.submit(task);
        // ... I assume you want to submit N tasks.
        
        for (int i = 0; i < N; ++i) {
          Future<Integer> future = comp.take();
          Integer result = future.get();
        
          // ... do something with the result ...
        }
        

        这将在项目完成后立即处理它们。如果你能够等到拥有一切,然后一口气处理完,你可以使用 Guava 的Futures.allAsList

        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
        List<Callable<Integer>> tasks = ...;
        
        List<ListenableFuture<Integer>> futures = new ArrayList<>();
        for (Callable<Integer> task : tasks) {
          futures.add(service.submit(task));
        }
        
        ListenableFuture<List<Integer>> resultsFuture = Futures.allAsList(futures);
        List<Integer> results = resultsFuture.get();
        
        // Process the results.
        

        【讨论】:

          【解决方案4】:

          使用 Guava 的 MoreExecutors.newDirectExecutorService()

          这将确保提交的代码将在 ThreadPool 的同一线程中执行。我知道,它不是主线程,但至少您不会像您想要的那样仅为侦听器创建其他新线程。

          import com.google.common.util.concurrent.*;
          import org.junit.jupiter.api.Test;
          import java.util.concurrent.*;
          
          class ExecutorTest {
            private static Callable<Integer> task = () -> {
              System.out.println("in call: " + Thread.currentThread().getName());
              TimeUnit.SECONDS.sleep(1);
              return 0;
            };
            @Test
            void test() throws InterruptedException {
              ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
              ExecutorService executor = MoreExecutors.newDirectExecutorService();
              service.submit(task).addListener(() -> {
                System.out.println("in listener: " + Thread.currentThread().getName());
              }, executor);
              executor.awaitTermination(2, TimeUnit.SECONDS);
            }
          }
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2014-08-27
            • 2017-12-29
            • 2018-04-12
            • 1970-01-01
            • 1970-01-01
            • 2017-06-19
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多