我们是如何编写异步代码的
异步执行,相信很多同学都用过,比如向下面这么使用
// 最原始的方式,效率比较低
new Thread(() -> {
// 执行任务
}).start();
// 线程池放到成员属性中,并管理了其生命周期
private ExecutorService executorService = Executors.newCachedThreadPool();
// 使用线程池的方式
executorService.execute(() -> {
// 执行任务
});
第一种方式就不用说了,不建议这样使用。第二种方式可以用,但是还是比较麻烦:
- 我们要去管理其生命周期
- 使用的方式也不是很优雅(业务代码中包含了非业务代码)
Spring很好的帮我们解决了这些问题
Spring中何编写异步方法
在spring-context中给我们提供了两个注解
@EnableAsync
:启用异步功能@Async
:加在方法上,这个方法就会异步执行
那么我们使用异步将会是很简单的2步:
- 告诉Spring开启异步,
@EnableAsync
注解(在启动类,或者某一个spring的组件上加上这个注解) - 在我们需要异步执行的方法上加上
@Async
做完这2步我们就编写好了一个异步执行的方法,是不很简单。
使用异步要注意的一些问题
-
ThreadLocal
问题ThreadLocal
我们都知道是绑定在当前线程的,而我们异步是在不同线程中执行的,那我们如何将当前线程ThreadLocal
中的数据传递到任务下个执行线程中去呢。Spring给我们提供了一个TaskDecorator
,我们只要提供一个TaskDecorator
实现类就行,如下:/** * 线程之间传递数据,使用TaskDecorator * * @return TaskDecorator */ @Bean public TaskDecorator createTaskDecorator() { return runnable -> { // 当前线程,获取User User user = UserContext.getUser(); return () -> { // 下一个线程开始执行之前,设置User UserContext.setUser(user); runnable.run(); }; }; }
-
使用this调用而未开启异步执行的陷阱
我们加一个注解
@Async
就能实现一个异步方法,这其实是基于Spring的AOP来实现的,最终Spring会生成一个代理对象,在代理对象中Spring会帮我们使用线程池去执行@Async
方法。我们可以注入自己来替换this的调用// 在配置文件中开启允许循环依赖 // spring.main.allow-circular-references=true /** * 注入自己(代理对象),这里不能直接使用注解的方式来注入, * 需要在初始化方法中注入(晚于BeanPostProcessor#postProcessAfterInitialization方法的执行) */ private ObjectProvider<CalcServiceImpl> selfProvider; private CalcServiceImpl self; public CalcServiceImpl(ObjectProvider<CalcServiceImpl> selfProvider) { this.selfProvider = selfProvider; } /** * 这里不能通过self属性上加注解来注入自己,因为@Async的包装方式不是提前暴露的 * {@link org.springframework.beans.factory.config.SmartInstantiationAwareBeanPostProcessor#getEarlyBeanReference}, * 而是使用{@link org.springframework.beans.factory.config.BeanPostProcessor#postProcessAfterInitialization}来包装代理对象的。 * 所以我们只能在初始化方法中来获取自己的引用(Spring在保证单实例唯一性做的校验) * * @see org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor */ @PostConstruct public void init() { self = selfProvider.getIfUnique(); } /** * 保存数据(异步) * * @param data data */ @Async public void saveDataAsync(long data) { log.info("save data data={}", data); // 保存数据的逻辑... } @Override public long sumSequence(long start, long diff, long number) { // 通过本类的代理对象调用自己的方法 self.saveDataAsync(start); // 执行其它逻辑..., Sn=n*a1+n(n-1)d/2 return number * start + ((number - 1) * number * diff) / 2; }
当然我们还可以获取当前代理对象来调用
((CalcServiceImpl)AopContext.currentProxy()).saveDataAsync(start)
-
返回值问题
当我们加上
@Async
时方法的返回值必须是Future
类型,如果不是Future
类型则返回为null
。看下Spring的源码// org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit @Nullable protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { if (CompletableFuture.class.isAssignableFrom(returnType)) { return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException(ex); } }, executor); } else if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null; } }
可以看到是
CompletableFuture
、ListenableFuture
、Future
类型的就可以拿到返回值(都是Future
的子类)其它情况是无法获取返回值的,即使我们的方法定义了返回类型也是返回null
。建议返回值使用
CompletableFuture
,这个在异步编程中比较好用。ListenableFuture
是Spring提供的接口有可以设置回调。Future
是顶级接口个人觉得使用起来还是比较麻烦 -
异常处理
在线程中执行过程中如果出现异;无返回值的方法则无法感知到异常,有返回值的方法在通过
Future
获取结果时会抛出内部的异常。Spring给我们提供了一个处理方式:AsyncUncaughtExceptionHandler
默认实现是SimpleAsyncUncaughtExceptionHandler
它对异常的处理方式就是打印日志。我们可以自己提供实现(AsyncConfigurer
)来替换Spring的默认实现,如下:/** * 异步未处理的异常 * * @param exception 异常 * @param method method * @param params params */ private void asyncUncaughtException(Throwable exception, Method method, Object... params) { if (log.isErrorEnabled()) { ReflectionUtils.invokeMethod(method, new Object()); log.error("Unexpected exception occurred invoking async method:{}", method, exception); } if (Future.class.isAssignableFrom(method.getReturnType())) { ReflectionUtils.rethrowRuntimeException(exception); } } /** * 提供AsyncConfigurer * * @param executorProvider executor provider * @return AsyncConfigurer */ @Bean public AsyncConfigurer createAsyncConfigurer(ObjectProvider<Executor> executorProvider) { return new AsyncConfigurer() { @Override public Executor getAsyncExecutor() { return executorProvider.getIfUnique(); } /** * 替换Spring默认的异步执行异常处理方式 * @see org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler * @return AsyncUncaughtExceptionHandler */ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (exception, method, params) -> asyncUncaughtException(exception, method, params); } }; }
到此处Spring中使用异步算是比较完整的了,但是我们使用过线程池的同学都知道线程池是有很多参数的,怎么给@Async
的线程池设置这些参数呢
定制线程池
Spring在我们未配置Executor
时会个我们配置一个默认的ThreadPoolTaskExecutor
可以参考Spring源代码:
// TaskExecutionAutoConfiguration 源码
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}
根据 TaskExecutionProperties
我们可以配置如下参数:
spring:
task:
execution:
thread-name-prefix: application-task- # 线程名称前缀
shutdown:
await-termination: true # 是否应等待已经提交的任务在关闭时完成
await-termination-period: 1m # 等待剩余任务完成的最长时间
pool:
core-size: 10 # 核心线程数
allow-core-thread-timeout: true # 核心线程在保持活动时间内没有任务到达时是否超时和终止的策略
queue-capacity: 1000 # 队列容量。无限容量不会增加池,因此会忽略"max-size"属性
max-size: 10 # 允许的最大线程数。如果任务正在填满队列,则池可以扩展到该大小以适应负载。如果队列没有边界,则忽略。
keep-alive: 1m # 线程在终止前可能保持空闲的时间限制。
可以看到我们可以设置线程池的各种参数。但是Spring给我们提供的线程池按照声明式的就只有一个,实现方式参考其源代码:TaskExecutionAutoConfiguration
。
在我们实际的开发场景中我们经常会根据不同的业务场景定制不同的线程池,这样做的好处是防止这个业务场景下把线程奔溃了而不影响其它业务场景,而这样还能根据不同的业务场景对线程池进行调优,因此Spring默认提供的这一个线程池配置往往是不够的。为了定制不同的线程池我们之前经常会这样做:
@Configuration
@EnableAsync
public class ExecutorConfiguration {
/**
* 业务场景xxx线程池
*
* @param environment environment
* @return Executor
*/
@Bean
public Executor xxxxExecutor(Environment environment) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 定制线程池的各种东西
// ...
return executor;
}
/**
* 业务场景xxx2线程池
*
* @param environment environment
* @return Executor
*/
@Bean
public Executor xxxx2Executor(Environment environment) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 定制线程池的各种东西
// ...
return executor;
}
}
就如上面这样,我们有一种场景的线程池就定制一次。
由于我最近看别人的一些产品中看到过声明式创建对象(就是在配置文件中写好相关的配置,yml文件),然后把这个文件上传,他们就能给我们创建相关的实例,而不用去写代码。这不由的让我想起了我们这边是不是也可以这样,读取配置文件然后把配置文件的值设置到线程池对象中就可以了吗,这样还省去了我们去写这些高度类似的代码。相关实现代码:https://github.com/myszh/java-code-samples/tree/main/spring-async
- ExecutorFactory:主要的核心类
- AsyncConfiguration:主配置类
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/267783.html