【问题标题】:Fairness issue in ScheduledExecutorServiceScheduledExecutorService 中的公平问题
【发布时间】:2014-02-12 10:00:07
【问题描述】:

以下示例显示了 ScheduledExecutorService 中的问题。我正在安排两个任务“1”和“2”的运行时间比计划间隔长。任务“2”提交另一个任务只执行一次。

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestExecutorFairness {
  public static void main(final String[] args) {

    final int interval = 200;
    final int sleeptime = 600;

    final ScheduledExecutorService executor = Executors
        .newSingleThreadScheduledExecutor();

    // schedule task 1
    executor.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(sleeptime);
        } catch (final InterruptedException e) {
          e.printStackTrace();
        }

        System.out.println("1");
      }
    }, interval, interval, TimeUnit.MILLISECONDS);

    // schedule task 2
    executor.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(sleeptime);
        } catch (final InterruptedException e) {
          e.printStackTrace();
        }

        System.out.println("2");

        // submit task 3
        executor.submit(new Runnable() {

          @Override
          public void run() {
            System.out.println("3");
          }
        });
      }
    }, interval, interval, TimeUnit.MILLISECONDS);

  }
}

我期望的输出类似于

1
2
1
2
3

但它不是那样执行的。任务“3”延迟了很长时间,但我需要尽快执行。

有什么办法可以让这种行为更公平?或者有人有更好的解决方案?

【问题讨论】:

    标签: java scheduledexecutorservice


    【解决方案1】:

    有趣。这似乎违反直觉,因为ScheduledExecutorService 的 JvaDoc 明确提到了

    使用 Executor.execute(java.lang.Runnable) 和 ExecutorService 提交方法提交的命令被调度,请求延迟为零

    因此可以假设提交这样的命令应该是可行的。但在这种情况下,有一些特殊性。我无法指出这种行为的确切原因,但它显然与

    • 任务耗时超过计划间隔
    • 正在提交的新任务来自已执行的任务
    • ScheduledExecutorService 内部使用DelayedWorkQueue 的事实
    • 最重要的是:您使用的是单线程ScheduledExecutorService

    一个相当大的问题也可能是这会填满工作队列,迟早会导致 OutOfMemoryError。这也可以在这个(略微调整的)示例中看到:

    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class TestExecutorFairness {
      public static void main(final String[] args) {
    
        final int interval = 200;
        final int sleeptime = 600;
    
        final ScheduledExecutorService executor = 
            Executors.newScheduledThreadPool(1);
    
        final long start = System.currentTimeMillis();
    
        // schedule task 1
        executor.scheduleAtFixedRate(new Runnable() {
          @Override
          public void run() {
            try {
              Thread.sleep(sleeptime);
            } catch (final InterruptedException e) {
              e.printStackTrace();
            }
    
            System.out.println("1 at "+(System.currentTimeMillis()-start));
          }
        }, interval, interval, TimeUnit.MILLISECONDS);
    
        // schedule task 2
        executor.scheduleAtFixedRate(new Runnable() {
          @Override
          public void run() {
            try {
              Thread.sleep(sleeptime);
            } catch (final InterruptedException e) {
              e.printStackTrace();
            }
    
            System.out.println("2 at "+(System.currentTimeMillis()-start));
    
    
            System.out.println("Submitting 3 to "+executor);
            // submit task 3
            executor.submit(new Runnable() {
    
              @Override
              public void run() {
                  System.out.println("3 at "+(System.currentTimeMillis()-start));
              }
            });
          }
        }, interval, interval, TimeUnit.MILLISECONDS);
    
      }
    }
    

    Executor 中“排队任务”的数量不断增加。

    这种情况下的解决方案相当简单:而不是一个

    Executors.newScheduledThreadPool(1)

    你可以创建一个

    Executors.newScheduledThreadPool(3)

    当然,这改变了本例中的“计时行为”。我必须假设此示例中的 Thread.sleep() 仅用于模拟不适合此示例代码的复杂计算。但也许只是确保线程数至少为numberOfPeriodicTasks+1 也可以应用于您的实际应用程序中。

    【讨论】:

    • 我还考虑了更多线程,但 Thread.sleep() 是 Modbus TCP 通信的占位符。该库不是为多线程设计的。我也可以拆分为一个执行器用于调度和一个执行器,但是如果任务花费太长时间,这将填满正在执行的队列。
    • 因此有 2 个周期性任务可能需要比调度间隔更长的时间,并且它们正在阻塞当前线程 - 我无法想象如何执行第三个任务 立即 在这种情况下。根据当前的描述,我现在看到的唯一解决方案是不 调度 Task3,而只是直接在 Task2 中执行相应的 Runnable - 但由于这太明显了,我假设你不这样做是有原因的......
    • 我在任务 2 中执行了任务 3,只是为了模拟以后的调用。但我想给直接提交比计划任务更高的优先级。
    • 是的,我看了一下ScheduledExecutorService等的源码,觉得应该可以把DelayedWorkQueue换成优先队列。但是围绕它的基础设施(在内部将所有任务包装到 ScheduledFutureTasks 等)使这变得困难。根据具体要求,可以考虑创建一个具有所需行为的自己的 ScheduledExecutorService 实现,但这可能是一项相当大的工作。也许有人会找到更简单优雅的解决方案。
    • @Stephan 您接受了它 - 但它是否解决了问题(或至少 help 来解决它)?否则,也许一个关于“在单线程 ScheduledExecutorService 中为任务分配优先级”之类的专门问题可能会引起一些关注并带来更多帮助(特别是当您提到它必须是单一的约束时 -已在问题中讨论)
    猜你喜欢
    • 2020-02-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-12-04
    • 2014-07-07
    • 2013-08-09
    • 2021-11-06
    相关资源
    最近更新 更多