【问题标题】:Stopping Callable in MvcAsyncTask在 MvcAsyncTask 中停止 Callable
【发布时间】:2015-05-21 02:50:32
【问题描述】:

我有一个带有 WebAsyncTask 的控制器。此外,我正在使用超时回调。 正如 here 所写,我可以选择通知 Callable 取消处理。但是,我看不到任何这样做的选择。

@Controller
public class UserDataProviderController {

    private static final Logger log = LoggerFactory.getLogger(UserDataProviderController.class.getName());

    @Autowired
    private Collection<UserDataService> dataServices;

       @RequestMapping(value = "/client/{socialSecurityNumber}", method = RequestMethod.GET)
        public @ResponseBody
        WebAsyncTask<ResponseEntity<CustomDataResponse>> process(@PathVariable final String socialSecurityNumber) {

            final Callable<ResponseEntity<CustomDataResponse>> callable = new Callable<ResponseEntity<CustomDataResponse>>() {

                @Override
                public ResponseEntity<CustomDataResponse> call() throws Exception {

                    CustomDataResponse CustomDataResponse = CustomDataResponse.newInstance();

                    // Find user data
                    for(UserDataService dataService:dataServices)
                    {
                        List<? extends DataClient> clients = dataService.findBySsn(socialSecurityNumber);
                        CustomDataResponse.put(dataService.getDataSource(), UserDataConverter.convert(clients));
                    }

                    // test long execution
                    Thread.sleep(4000);

                    log.info("Execution thread continued and shall be terminated:"+Thread.currentThread().getName());


                    HttpHeaders responseHeaders = new HttpHeaders();
                    responseHeaders.setContentType(new MediaType("application", "json", Charset.forName("UTF-8")));
                    return new ResponseEntity(CustomDataResponse,responseHeaders,HttpStatus.OK);
                }

            };

            final Callable<ResponseEntity<CustomDataResponse>> callableTimeout = new Callable<ResponseEntity<CustomDataResponse>>() {
                @Override
                public ResponseEntity<CustomDataResponse> call() throws Exception {

                    // Error response
                    HttpHeaders responseHeaders = new HttpHeaders();
                    responseHeaders.setContentType(new MediaType("application", "json", Charset.forName("UTF-8")));
                    return new ResponseEntity("Request has timed out!",responseHeaders,HttpStatus.INTERNAL_SERVER_ERROR);
                }
            };

            WebAsyncTask<ResponseEntity<CustomDataResponse>> task = new WebAsyncTask<>(3000,callable);
            task.onTimeout(callableTimeout);
            return task;
        }
}

我的@WebConfig

@Configuration
@EnableWebMvc
class WebAppConfig  extends WebMvcConfigurerAdapter {

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setKeepAliveSeconds(60 * 60);
        executor.afterPropertiesSet();

        configurer.registerCallableInterceptors(new TimeoutCallableProcessingInterceptor());
        configurer.setTaskExecutor(executor);
    } 
}

而且相当标准的拦截器:

public class TimeoutCallableProcessingInterceptor extends CallableProcessingInterceptorAdapter {

    @Override
    public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) {

        throw new IllegalStateException("[" + task.getClass().getName() + "] timed out");

    }
}

一切正常,但来自控制器的 Callable 总是完成,这很明显,但是如何停止那里的处理?

【问题讨论】:

  • 你的意思是需要 4 秒的 callable 不会在 3 秒后停止?
  • 是的。基本上我不希望在睡眠后执行任何代码。
  • @Premek 就是这样。您必须手动处理中断,即从数据服务返回结果的可调用对象应检查它是否收到中​​断信号,并在您的情况下通过发送另一个响应来对其采取行动。
  • @Magnamag 你的意思是像 call: Thread.currentThread().interrupt(); in callableTimout 并在控制器中检查它,比如` if (Thread.currentThread().isInterrupted()){` 我不确定这是正确的方法...

标签: java spring spring-mvc timeout asynccallback


【解决方案1】:

您可以使用WebAsyncTask 来实现超时控制和Thread 管理来优雅地停止新的异步线程。

  1. 实现Callable 来运行进程
  2. 在这个方法中(在不同的线程中运行)将当前的Thread 存储在控制器的局部变量中
  3. 实现另一个Callable 来处理超时事件
  4. 在此方法中检索先前存储的Thread 并调用interrupt() 方法中断它。
  5. 同时抛出TimeoutException 来停止控制器进程
  6. 在运行过程中,检查线程是否以Thread.currentThread().isInterrupted()中断,如果是,则回滚抛出异常的事务。

控制器:

public WebAsyncTask<ResponseEntity<BookingFileDTO>> confirm(@RequestBody final BookingConfirmationRQDTO bookingConfirmationRQDTO)
        throws AppException,
        ProductException,
        ConfirmationException,
        BeanValidationException {

    final Long startTimestamp = System.currentTimeMillis();
    // The compiler obligates to define the local variable shared with the callable as final array
    final Thread[] asyncTaskThread = new Thread[1];

    /**
     *  Asynchronous execution of the service's task
     *  Implemented without ThreadPool, we're using Tomcat's ThreadPool
     *  To implement an specific ThreadPool take a look at http://docs.spring.io/spring/docs/current/spring-framework-reference/htmlsingle/#mvc-ann-async-configuration-spring-mvc
     */
    Callable<ResponseEntity<BookingFileDTO>> callableTask = () -> {

        //Stores the thread of the newly started asynchronous task
        asyncTaskThread[0] = Thread.currentThread();

        log.debug("Running saveBookingFile task at `{}`thread", asyncTaskThread[0].getName());
        BookingFileDTO bookingFileDTO = bookingFileService.saveBookingFile(
                bookingConfirmationRQDTO,
                MDC.get(HttpHeader.XB3_TRACE_ID))
                .getValue();
        if (log.isDebugEnabled()) {
            log.debug("The saveBookingFile task took {} ms",
                    System.currentTimeMillis() - startTimestamp);
        }
        return new ResponseEntity<>(bookingFileDTO, HttpStatus.OK);
    };

    /**
     * This method is executed if a timeout occurs
     */
    Callable<ResponseEntity<BookingFileDTO>> callableTimeout = () -> {

        String msg = String.format("Timeout detected at %d ms during confirm operation",
            System.currentTimeMillis() - startTimestamp);
        log.error("Timeout detected at {} ms during confirm operation: informing BookingFileService.", msg);

        // Informs the service that the time has ran out
        asyncTaskThread[0].interrupt();

        // Interrupts the controller call
        throw new TimeoutException(msg);
    };

    WebAsyncTask<ResponseEntity<BookingFileDTO>> webAsyncTask = new WebAsyncTask<>(timeoutMillis, callableTask);
    webAsyncTask.onTimeout(callableTimeout);
    log.debug("Timeout set to {} ms", timeoutMillis);
    return webAsyncTask;
}

服务实现:

/**
 * If the service has been informed that the time has ran out
 * throws an AsyncRequestTimeoutException to roll-back transactions
 */
private void rollbackOnTimeout() throws TimeoutException {
    if(Thread.currentThread().isInterrupted()) {
        log.error(TIMEOUT_DETECTED_MSG);
        throw new TimeoutException(TIMEOUT_DETECTED_MSG);
    }
}

@Transactional(rollbackFor = TimeoutException.class, propagation = Propagation.REQUIRES_NEW)
DTOSimpleWrapper<BookingFileDTO> saveBookingFile(BookingConfirmationRQDTO bookingConfirmationRQDTO, String traceId) {

    // Database operations
    // ...

    return retValue;
}

【讨论】:

    猜你喜欢
    • 2013-12-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-07
    • 2013-03-26
    • 2011-01-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多