【问题标题】:Task execution in Java web applicationJava Web 应用程序中的任务执行
【发布时间】:2016-02-03 13:18:27
【问题描述】:

我正在开发 Spring MVC Web 应用程序。它的功能之一是文件转换(上传文件 -> 转换 -> 存储在服务器上)。 有些文件可能太大而无法即时转换,所以我决定在上传后将它们放在共享队列中。 文件将根据上传时间优先转换,即FIFO。

我的想法是在上传后将任务添加到控制器中的队列中。 还会有服务执行队列中的所有任务,如果为空,则等待添加新任务。我不需要调度 - 任务应该总是在队列不为空时执行。

我已阅读有关 ExecutorService 的信息,但没有找到适合我的案例的任何示例。

如果有任何建议,我将不胜感激。

编辑 感谢您的回答,我需要澄清我的问题: 基本上,我知道如何执行任务,我需要通过处理任务队列来管理。用户应该能够查看队列并暂停、恢复或从队列中删除任务。 我的任务类:

public class ConvertTask implements Callable<String> {

    private Converter converter;
    private File source;
    private File target;
    private State state;
    private User user;

    public ConvertTask(Converter converter, File source, File target, User user) {
        this.converter = converter;
        this.source = source;
        this.target = target;
        this.user = user;
        this.state = State.READY;
    }

    @Override
    public String call() throws Exception {
        if (this.state == State.READY) {
            BaseConverterService converterService = ConverterUtils.getConverterService(this.converter);
            converterService.convert(this.source, this.target);
            MailSendServiceUtil.send(user.getEmail(), target.getName());
            return "success";
        }
        return "task not ready";
    }
}

我还根据您的建议创建了负责管理队列/任务的类:

@Component
public class MyExecutorService {
    private LinkedBlockingQueue<ConvertTask> converterQueue = new LinkedBlockingQueue<>();

    private ExecutorService executorService = Executors.newSingleThreadExecutor();

    public void add(ConvertTask task) throws InterruptedException {
        converterQueue.put(task);
    }

    public void execute() throws InterruptedException, ExecutionException {
        while (!converterQueue.isEmpty()) {
            ConvertTask task = converterQueue.peek();
            Future<String> statusFuture = executorService.submit(task);
            String status = statusFuture.get();
            converterQueue.take();
        }
    }
}

我的观点是,如果队列不为空,如何执行任务,并在添加新任务且队列先前为空时恢复。我想到了一些适合add(ConvertTask task) 方法的代码。

【问题讨论】:

    标签: java spring queue task


    【解决方案1】:

    问题更新后编辑

    您不需要为任务创建任何队列,因为ThreadPoolExecutor 实现有自己的队列。下面是Oracle的Java 8实现newSingleThreadExecutor()方法的源代码:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    

    所以你只需直接提交一个新任务,它就会被ThreadPoolExecutor排队

    @Component
    public class MyExecutorService {
        private ExecutorService executorService = Executors.newSingleThreadExecutor();
    
        public void add(ConvertTask task) throws InterruptedException {
            Future<String> statusFuture = executorService.submit(task);
        }
    }
    

    如果您担心队列的边界,可以显式创建队列实例并将其提供给 ThreadPoolExecutor 构造函数。

    private executorService = new ThreadPoolExecutor(1, 1, 
                     0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(MAX_SIZE));
    

    请注意,我已删除该行

    String status = statusFuture.get();
    

    因为get() 呼叫被阻塞。如果您在提交的同一线程中有此行,则您的代码不再是异步的。您应该存储 Future 对象并在不同的线程中异步检查它们。或者你可以考虑使用Java 8中引入的CompletableFuture。查看this post

    【讨论】:

    • 我能以某种方式获得ExecutorService 添加任务的队列吗?我需要具有查看队列和修改其元素的功能。
    • 接口ExecutorService 不知道队列的任何事情,但实现ThreadPoolExecutor 知道。它有一个方法getQueue()。您还可以提供您创建的队列实例。这是我的答案。
    • 谢谢,我想应该没问题的。不幸的是,getQueue() 返回了FutureTask,所以我需要使用反射从callable 字段中获取ConvertTask,如图here
    • 为什么需要操作提交的队列?这将是一个糟糕的关注点分离。它有难闻的气味。如果您需要预处理您的任务,您可以在将它们提交给您的转换器之前执行此操作。与其让不同的组件相互干涉,不如将处理链化。如果您有多个异步步骤 - 使用多个执行器。
    • 正如我之前写的,我需要能够在视图中显示队列,并可以删除任务。示例:用户 A 将 5 个任务添加到队列中。用户 B 还添加了 5 个任务。现在管理员可以查看整个队列并删除他想要的任何任务。
    【解决方案2】:

    上传后您应该立即返回响应。客户端不能等待资源太久。但是,您可以在客户端设置中更改它。无论如何,如果您正在运行后台任务,您可以在不与客户端交互或在执行过程中通知客户端的情况下执行它。这是执行器服务使用的可调用演示示例

    /**
     * Created by Roma on 17.02.2015.
     */
    class SumTask implements Callable<Integer> {
      private int num = 0;
      public SumTask(int num){
        this.num = num;
      }
      @Override
      public Integer call() throws Exception {
        int result = 0;
        for(int i=1;i<=num;i++){
          result+=i;
        }
        return result;
      }
    }
    
    public class CallableDemo {
    
      Integer result;
      Integer num;
    
      public Integer getNumValue() {
        return 123;
      }
      public Integer getNum() {
        return num;
      }
    
      public void setNum(Integer num) {
        this.num = num;
      }
    
      public Integer getResult() {
        return result;
      }
    
      public void setResult(Integer result) {
        this.result = result;
      }
    
      ExecutorService service =  Executors.newSingleThreadExecutor();
    
      public String  execute() {
        try{
    
          Future<Integer> future = service.submit(new SumTask(num));
          result = future.get();
          //System.out.println(result);
          service.shutdown();
        }
        catch(Exception e)
        {
          e.printStackTrace();
        }
    
        return "showlinks";
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-03-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-07-27
      • 2012-08-03
      相关资源
      最近更新 更多