一. 项目中架构要求:
外围客户端系统 - 交易中心微服务(场景微服务) - 交易集市微服务/能力中心 - ESB - 后方系统(理财/基金等)。
交易中心需要同时调用交易集市十几个组件/接口。由于通讯时间太长和接口请求太多,考虑使用多线程。
考虑使用非阻塞的多线程类 Future。Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作。
二. 项目使用是银行内部代码,不便于展示,当时的案例demo如下 ,亲测。
2.1 配置类。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
class ThreadPoolConfig {
// @Bean 指定类标识,默认为 类的首字母小写
@Bean("taskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(5);
// 设置最大线程数
executor.setMaxPoolSize(10);
// 设置队列容量
executor.setQueueCapacity(20);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(60);
// 设置默认线程名称
executor.setThreadNamePrefix("mynah886-test-async-threads-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
2.2 模拟出入参数类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TaskInfo {
private String taskId;
private String taskName;
}
2.3 模拟实现逻辑
2.3.1 控制层类
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
@RestController
@RequestMapping("/async")
public class AsyncController {
private Logger logger = LoggerFactory.getLogger(AsyncController.class);
@Autowired
private TaskLogService taskLogService;
@GetMapping(value = "/task" )
public String taskExecute(){
TaskInfo taskInfo1 = new TaskInfo("task1", "taskName001");
TaskInfo taskInfo2 = new TaskInfo("task2", "taskName001");
long startTime = System.currentTimeMillis();
Future<TaskInfo> future1 = null;
Future<TaskInfo> future2 = null;
try {
future1 = taskLogService.insertTaskLog(taskInfo1);
future2 = taskLogService.updateTaskLog(taskInfo2);
// 异步线程池执行 等待执行完成 isDone() 进行判断
/*while (true) {
if (future1.isDone() && future2.isDone()) {
System.out.println("异步任务一、二已完成");
break;
}
}*/
} catch (Exception e) {
log.debug("执行异步任务异常 {}" + e.getMessage());
}
/* V get(long timeout, TimeUnit unit) 设置取结果超时时间 */
TaskInfo result1 = null;
TaskInfo result2 = null;
try {
result1 = future1.get(3, TimeUnit.SECONDS);
result2 = future2.get();
log.debug("任务一result1 == " + result1 + " 任务二result2 == " + result2);
} catch (InterruptedException e) {
log.debug("InterruptedException: {}", e.getMessage());
} catch (ExecutionException e) {
log.debug("ExecutionException: {}", e.getMessage());
} catch (TimeoutException e) {
log.debug("接口get取值超时: {}", e.getMessage());
}
long endTime = System.currentTimeMillis();
log.debug("异步任务总耗时: " + (endTime - startTime));
return result1 + " --- " + result2;
}
}
2.3.2 接口类
import java.util.concurrent.Future;
public interface TaskLogService {
Future<TaskInfo> insertTaskLog(TaskInfo taskInfo) throws InterruptedException;
Future<TaskInfo> updateTaskLog(TaskInfo taskInfo) throws InterruptedException;
}
2.3.3 实现类
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.Future;
@Service
public class TaskLogServiceImpl implements TaskLogService {
/**
* 需要进行多线程任务的方法
* 1. 方法注解 @Async("taskExecutor") ,括号指定 线程池 名称
* 2. 方法返回值类型为 AsyncResult<T>
* 注意: 方法返回值类型由 Future 进行包装,即 Future<T>, 返回对象为 AsyncResult<T>
* @param taskInfo
* @return Future<String>
* @throws InterruptedException
*/
@Override
@Async("taskExecutor")
public Future<TaskInfo> insertTaskLog(TaskInfo taskInfo) throws InterruptedException {
System.out.println("1---------currentThread: " + Thread.currentThread() );
System.out.println("任务一 Thread Sleep 2s Start, " + taskInfo.getTaskId());
Thread.sleep(2000);
taskInfo.setTaskId("001-testId");
taskInfo.setTaskName("001-teatName");
System.out.println("任务一 Thread Sleep 2s End, " + taskInfo.getTaskId());
return new AsyncResult<>( taskInfo );
}
@Override
@Async("taskExecutor")
public Future<TaskInfo> updateTaskLog(TaskInfo taskInfo) throws InterruptedException {
System.out.println("2---------currentThread: " + Thread.currentThread() );
System.out.println("任务二 Thread Sleep 5s Start, " + taskInfo.getTaskId());
Thread.sleep(5000);
taskInfo.setTaskId("002-testId");
taskInfo.setTaskName("002-teatName");
System.out.println("任务二 Thread Sleep 5s End, " + taskInfo.getTaskId());
return new AsyncResult<>( taskInfo );
}
}
2.4 测试结果