【问题标题】:How to implement a synchronous job queue in Spring?如何在 Spring 中实现同步作业队列?
【发布时间】:2021-05-27 12:35:19
【问题描述】:

我正在尝试了解如何使用 Spring 实现作业队列。

我已经启动并运行了一个服务器,我计划让用户向它提交 POST 请求。这将接收一些数据,然后将作业排队以处理这些数据。

处理这些数据是一个昂贵的过程,有时可能需要 5 到 20 分钟(取决于需要完成的工作量)。因此,它需要同步运行。 IE。一项工作完成,然后可以开始下一项工作。

例如

  • 用户提交作业 A
  • 由于队列为空,作业 A 已启动
  • 另一个用户提交了第二个作业,作业 B
  • 作业 A 仍在运行,因此作业 B 被放入队列中
  • 另一个用户出现并提交作业 C,作业 A 仍在运行,因此它与作业 B 一起放入队列中。

我最近才开始学习 Spring,所以我正在寻找一些关于如何实现这一目标的想法。

我的想法是创建一个工厂类来接收可以安排的工作。

我的一个端点如下所示:

@RequestMapping(value = "/submitjob", method = RequestMethod.POST)
    public void queueJob(
            @RequestPart("file") MultipartFile file
    ) {

        if (file != null) {
           // queue job
        }
        // else return bad response.
    }

非常感谢任何建议。

【问题讨论】:

    标签: java spring spring-boot queue


    【解决方案1】:

    您可以将java.util.concurrent.ExecutorService 与单个线程一起使用来实现此行为。

    注意:此实现可以轻松演变为多线程服务,因此您可以并行运行处理

    您必须面对的第一个问题是您不想阻止客户的请求。

    如果您将MultipartFile 直接传递给服务,它必须等到文件处理完毕,可能会超时,因为输入流在请求中。

    首先,您必须复制多部分的文件才能上传。在您的控制器中:

    private final FileProcessingService fileProcessingService;
    
    public StackOverFlowController(FileProcessingService fileProcessingService) {
        this.fileProcessingService = fileProcessingService;
    }
    
    @PostMapping(value = "/submitjob")
    public void queueJob(@RequestPart("file") MultipartFile multipartFile) throws IOException, ExecutionException, InterruptedException {
    
        File tempFile = copyInputStreamToTempFile(multipartFile);
    
        fileProcessingService.queueFile(tempFile);
    
    }
    
    private File copyInputStreamToTempFile(MultipartFile multipartFile) throws IOException {
        File tempFile = File.createTempFile("queued-file-", ".tmp");
        try (OutputStream os = new FileOutputStream(tempFile)) {
            IOUtils.copy(multipartFile.getInputStream(), os);
        }
        return tempFile;
    }
    
            
    

    这里 MultipartFile 被复制到一个临时文件中,但你可以将它保存在一个目录中 然后将文件传递给必须是非阻塞的 FileProcessingService

    然后,要创建一个将按顺序处理文件的非阻塞队列,您可以使用单线程ExecutorService。调用execute 会将任务添加到队列中。该方法接受Runnable的实现参数

    服务框架可能如下所示:

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import java.io.File;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Component
    @Slf4j
    public class FileProcessingService {
    
        private final ExecutorService executor = Executors.newSingleThreadExecutor();
    
        public void queueFile(File fileToProcess) {
            executor.execute(new FileProcessRunnable(fileToProcess));
            log.info("Queued file " + fileToProcess);
        }
    }
    

    带有Thread.sleep 的 Runnable 的简单存根来模拟处理:

    @Slf4j
    public class FileProcessRunnable implements Runnable {
    
        private final File fileToProcess;
    
        public FileProcessRunnable(File fileToProcess) {
            this.fileToProcess = fileToProcess;
        }
    
        @Override
        public void run() {
            process();
            log.info("Processed file " + fileToProcess.getName());
        }
    
        private void process() {
            try {
                Thread.sleep(1000); //simulating process
            } catch (InterruptedException e) {
                log.error("Error during process", e);
            }
        }
    }
    

    模拟行为的不太真实的测试:

    @Test
    @SneakyThrows
    void should_queue_file_processing() {
        FileProcessingService fileProcessingService = new FileProcessingService();
    
        File file1 = File.createTempFile("temp-", ".tmp");
        File file2 = File.createTempFile("temp-", ".tmp");
        File file3 = File.createTempFile("temp-", ".tmp");
        File file4 = File.createTempFile("temp-", ".tmp");
    
        fileProcessingService.queueFile(file1);
        fileProcessingService.queueFile(file2);
        fileProcessingService.queueFile(file3);
        fileProcessingService.queueFile(file4);
    
        Thread.sleep(1000 * 5);//await until tasks are completed
    }
    

    上面的测试将记录:

    如您所见,文件在处理之前已排队

    有关执行者的更多信息,请参阅:https://www.baeldung.com/java-executor-service-tutorial

    【讨论】:

    • 快速跟进 ``` FileProcessingService fileProcessingService = new FileProcessingService();``` 。如果多次实例化,是否允许创建多个线程。还是会重用同一个线程?
    • 如果实例化多次,它将创建多个线程,因为 FileProcessingService 负责创建 ExecutorService
    • 啊,好吧。这就是我认为的情况,但想确定一下。我只需要将该部分包装在一个 Singleton 类中,以确保只创建一个线程。
    • @Component of spring 确保你的 bean 是一个单例
    • 优秀。道歉。仍然习惯了 Spring,所以这对我来说是全新的术语。我会把你标记为答案。
    【解决方案2】:

    您可能还想查看https://github.com/jobrunr/jobrunr。它是一个分布式作业队列。您可以仅使用 Java 8 lambda 创建作业。

    【讨论】:

      猜你喜欢
      • 2012-12-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-01-05
      • 2017-04-17
      • 2014-08-19
      • 1970-01-01
      • 2012-01-23
      相关资源
      最近更新 更多