【问题标题】:Java set a callback from ExecutorServiceJava 从 ExecutorService 设置回调
【发布时间】:2012-10-18 10:43:07
【问题描述】:

我有一个固定的线程池,我用它来运行一堆工作线程,以实现具有许多组件的任务的并行执行。

当所有线程都完成后,我使用方法 (getResult) 检索它们的结果(非常大)并将它们写入文件。

最终,为了节省内存并能够查看中间结果,我希望每个线程在完成执行后立即将其结果写入文件,然后释放其内存。

通常,我会在 run() 方法的末尾添加实现该效果的代码。但是,该类中的某些其他对象也调用这些线程,但不希望它们将结果写入文件 - 相反,它们使用其结果执行其他计算,这些计算最终会写入文件。

所以,我想知道是否可以将回调函数附加到使用 ExecutorService 完成的线程事件。这样,我可以立即检索其结果并在该场景中释放内存,但在其他场景中使用这些线程时不会破坏代码。

这样的事情可能吗?

【问题讨论】:

    标签: java multithreading callback threadpool executorservice


    【解决方案1】:

    如果可以选择使用 Google Guava,您可以通过以下方式使用ListenableFuture 接口:

    1. 通过MoreExecutors.listeningDecorator(existingExecutorService)ExecutorService 转换为ListeningExecutorService
    2. ListeningExecutorServicesubmit(Callable<V>) 方法已缩小为返回ListenableFuture,它是Future 的子接口。
    3. ListenableFuture 有一个 addListener() 方法,因此您可以注册回调以在未来完成时运行。

    【讨论】:

      【解决方案2】:

      您可以使用CompletableFuture 为Java 8+ 中的线程返回时添加回调,如下所示,其中t 是您长时间运行的计算的结果,

      CompletableFuture.supplyAsync(() -> {
          T t = new T();
          // do something
          return t;
      }).thenApply(t -> {
          // process t
      });
      

      如果你只想在 Java 7 中使用回调,你可以这样做,

      int x = 10;
      ExecutorService fixedThreadPool = Executors.newFixedThreadPool(x);
      Future<T> result = fixedThreadPool.submit(() -> {
          // do calculation
          return T;
      });
      fixedThreadPool.submit(() -> {
          long minutesToWait = 5;
          T t = null;
          try {
              t = result.get(minutesToWait, TimeUnit.MINUTES);
          } catch (InterruptedException | ExecutionException | TimeoutException e) {
              LOGGER.error(e);
          }
          if (t != null) {
              // process t
          }
      });
      

      【讨论】:

      • 在第一个例子中你忘了给supplyAsync()method 提供fixedThreadPool。
      【解决方案3】:

      ExecutorService#submit return FutureTask&lt;T&gt; 帮助您检索结果,ExecutorService#get 方法将阻止执行,直到计算未完成。示例 -

      ExecutorService executor = Executors.newFixedThreadPool(10);
      Future<Long> future = executor.submit(new Callable<Long>(){
             @Override
             public Long call() throws Exception {
                 long sum = 0;
                 for (long i = 0; i <= 10000000l; i++) {
                     sum += i;
                 }
                 return sum;
             }
      });
      Long result = future.get();
      System.out.println(result);
      

      【讨论】:

      • 他特别不想改变他的问题中所述的run() 方法。
      • 您将任务提交到单独的线程的原因是您可以避免长时间阻塞当前线程进行长时间计算,并且您可以将当前线程用于其他事情。由于future.get() 阻塞,它应该在一个也提交给ExecutorService 的任务中调用。
      【解决方案4】:

      所以,我想知道是否可以使用 ExecutorService 将回调函数附加到线程完成的事件。

      不直接,不,但有几种方法可以实现这一点。想到的最简单的方法是将您的 Runnable 包装在另一个 Runnable 中,这会收获结果。

      所以你会做这样的事情:

      threadPool.submit(new ResultPrinter(myRunnable));
      ...
      
      private static class ResultPrinter implements Runnable {
          private final MyRunnable myRunnable;
          public ResultPrinter(MyRunnable myRunnable) {
              this.myRunnable = myRunnable;
          }
          public void run() {
              myRunnable.run();
              Results results = myRunnable.getResults();
              // print results;
          }
      }
      

      【讨论】:

      • myRunnable.getReulst 仍然会阻塞执行线程,它不是带有“回调”的纯异步模式。
      • 我假设getResult() 调用没有阻塞,并且run() 方法将所有结果存储在您的对象中。拥有get... 方法块不是@JasonMing 的最佳模式。
      【解决方案5】:

      织机项目

      Project Loom 有望为 Java 的并发设施带来新功能。实验性构建 available now,基于早期访问 Java 17。Loom 团队正在征求反馈。有关详细信息,请参阅团队成员(例如 Ron Pressler 或 Alan Bateman)的任何最新视频和文章。 Loom 已经发展,因此请研究最新资源。

      Project Loom 的一个便捷功能是将ExecutorService 设为AutoCloseable。这意味着我们可以使用 try-with-resources 语法来自动关闭执行器服务。 try 块末尾的控制流阻塞,直到所有提交的任务都完成/失败/取消。之后,执行器服务会自动关闭。简化我们的代码,并通过可视化代码结构使我们等待任务完成的意图变得明显。

      Project Loom 的另一个导入功能是虚拟线程(又名fibers)。虚拟线程在内存和 CPU 方面都是轻量级的。

      • 关于内存,每个虚拟线程都会获得一个堆栈,该堆栈可根据需要增长和缩小。
      • 关于 CPU,许多虚拟线程中的每一个都位于多个平台/内核线程中的任何一个之上。这使得阻塞非常便宜。当一个虚拟线程阻塞时,它会被“停放”(搁置),以便另一个虚拟线程可以继续在“真实”平台/内核线程上执行。

      轻量级意味着我们可以同时拥有许多个虚拟线程,甚至数百万。

      ➥ 您的问题的挑战是在提交的任务准备好返回其结果时立即做出反应,而无需等待所有其他任务完成。这使用 Project Loom 技术要简单得多。

      只需在另一个线程上的每个 Future 上调用 get

      因为我们有几乎无限数量的线程,而且阻塞非常便宜,我们可以提交一个任务,该任务只需调用 Future#get 来等待每个 Callable 返回的每个 Future 的结果。执行人服务。对get 的调用会阻塞,一直等到来自其的Callable 完成工作并返回结果。

      通常,我们希望避免将Future#get 调用分配给常规后台线程。该线程将停止所有进一步的工作,直到阻塞的get 方法返回。但是使用 Project Loom,会检测到该阻塞调用,并且它的线程被“停放”,因此其他线程可能会继续。当阻塞调用最终返回时,Loom 也会检测到,导致不再阻塞任务的虚拟线程很快被安排在“真实”线程上进一步执行。所有这些停放和重新调度都快速而自动地发生,我们作为 Java 程序员无需付出任何努力。

      为了演示,我的任务结果被填充到并发映射中。为了表明在结果可用后立即发生这种情况,我重写了 ConcurrentSkipListMap 类上的 put 方法以执行 System.out.println 消息。

      完整的示例应用程序如下所示。但 3 条关键线如下。请注意我们如何实例化一个休眠几秒钟的Callable,然后将当前时刻作为Instant 对象返回。当我们提交每个Callable 对象时,我们会返回一个Future 对象。对于每个返回的 Future,我们将另一个任务 Runnable 提交给我们的同一个执行服务,该服务仅调用 Future#get,等待结果,并最终将该结果发布到我们的结果映射中。

      final Callable < Instant > callable = new TimeTeller( nth );
      final Future < Instant > future = executorService.submit( callable ); // Submit first task: a `Callable`, an instance of our `TimeTeller` class.
      executorService.submit( ( ) -> results.put( nth , future.get() ) );   // Submit second task: a `Runnable` that merely waits for our first task to finish, and put its result into a map.
      

      警告:我不是并发方面的专家。但我相信我的方法是合理的。

      警告:Project Loom 仍处于试验阶段,其 API 和行为可能会发生变化。

      package work.basil.example.callbacks;
      
      import java.time.Duration;
      import java.time.Instant;
      import java.util.concurrent.*;
      
      public class App
      {
          public static void main ( String[] args )
          {
              App app = new App();
              app.demo();
          }
      
          private void demo ( )
          {
              System.out.println( "INFO - Starting `demo` method. " + Instant.now() );
              int limit = 10;
              ConcurrentNavigableMap < Integer, Instant > results = new ConcurrentSkipListMap <>()
              {
                  @Override
                  public Instant put ( Integer key , Instant value )
                  {
                      System.out.println( "INFO - Putting key=" + key + " value=" + value + " at " + Instant.now() );
                      return super.put( key , value );
                  }
              };
              try (
                      ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
              )
              {
                  for ( int i = 0 ; i < limit ; i++ )
                  {
                      final Integer nth = Integer.valueOf( i );
                      final Callable < Instant > callable = new TimeTeller( nth );
                      final Future < Instant > future = executorService.submit( callable ); // Submit first task: a `Callable`, an instance of our `TimeTeller` class.
                      executorService.submit( ( ) -> results.put( nth , future.get() ) );   // Submit second task: a `Runnable` that merely waits for our first task to finish, and put its result into a map.
                  }
              }
              // At this point flow-of-control blocks until:
              // (a) all submitted tasks are done/failed/canceled, and
              // (b) the executor service is automatically closed.
              System.out.println( "INFO - Ending `demo` method. " + Instant.now() );
              System.out.println( "limit = " + limit + " | count of results: " + results.size() );
              System.out.println( "results = " + results );
          }
      
          record TimeTeller(Integer id) implements Callable
          {
              @Override
              public Instant call ( ) throws Exception
              {
                  // To simulate work that involves blocking, sleep a random number of seconds.
                  Duration duration = Duration.ofSeconds( ThreadLocalRandom.current().nextInt( 1 , 55 ) );
                  System.out.println( "id = " + id + " ➠ duration = " + duration );
                  Thread.sleep( duration );
                  return Instant.now();
              }
          }
      }
      

      运行时。

      INFO - Starting `demo` method. 2021-03-07T07:51:03.406847Z
      id = 1 ➠ duration = PT27S
      id = 2 ➠ duration = PT4S
      id = 4 ➠ duration = PT6S
      id = 5 ➠ duration = PT16S
      id = 6 ➠ duration = PT34S
      id = 7 ➠ duration = PT33S
      id = 8 ➠ duration = PT52S
      id = 9 ➠ duration = PT17S
      id = 0 ➠ duration = PT4S
      id = 3 ➠ duration = PT41S
      INFO - Putting key=2 value=2021-03-07T07:51:07.443580Z at 2021-03-07T07:51:07.444137Z
      INFO - Putting key=0 value=2021-03-07T07:51:07.445898Z at 2021-03-07T07:51:07.446173Z
      INFO - Putting key=4 value=2021-03-07T07:51:09.446220Z at 2021-03-07T07:51:09.446623Z
      INFO - Putting key=5 value=2021-03-07T07:51:19.443060Z at 2021-03-07T07:51:19.443554Z
      INFO - Putting key=9 value=2021-03-07T07:51:20.444723Z at 2021-03-07T07:51:20.445132Z
      INFO - Putting key=1 value=2021-03-07T07:51:30.443793Z at 2021-03-07T07:51:30.444254Z
      INFO - Putting key=7 value=2021-03-07T07:51:36.445371Z at 2021-03-07T07:51:36.445865Z
      INFO - Putting key=6 value=2021-03-07T07:51:37.442659Z at 2021-03-07T07:51:37.443087Z
      INFO - Putting key=3 value=2021-03-07T07:51:44.449661Z at 2021-03-07T07:51:44.450056Z
      INFO - Putting key=8 value=2021-03-07T07:51:55.447298Z at 2021-03-07T07:51:55.447717Z
      INFO - Ending `demo` method. 2021-03-07T07:51:55.448194Z
      limit = 10 | count of results: 10
      results = {0=2021-03-07T07:51:07.445898Z, 1=2021-03-07T07:51:30.443793Z, 2=2021-03-07T07:51:07.443580Z, 3=2021-03-07T07:51:44.449661Z, 4=2021-03-07T07:51:09.446220Z, 5=2021-03-07T07:51:19.443060Z, 6=2021-03-07T07:51:37.442659Z, 7=2021-03-07T07:51:36.445371Z, 8=2021-03-07T07:51:55.447298Z, 9=2021-03-07T07:51:20.444723Z}
      

      【讨论】:

        猜你喜欢
        • 2011-08-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-05-07
        • 1970-01-01
        • 2020-08-27
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多