|
| 1 | +springboot-异步 |
| 2 | +=== |
| 3 | + |
| 4 | +## 背景 |
| 5 | +在平常应用中,绝大多数情况下都是通过同步的方式来实现交互处理的;但是在处理与第三方系统交互的时候,容易造成响应迟缓的情况,之前大部分都是使用多线程来完成此类任务,其实,在spring 3.x之后,就已经内置了@Async来完美解决这个问题,本文将完成介绍@Async的用法 |
| 6 | + |
| 7 | +## 使用场景案例 |
| 8 | + 例如,我们需要在一个方法中完成方法内的逻辑后需要通知其他接口,但是不需要等其他接口返回结果就直接在当前方法中返回(比如用户支付完订单后需要及时给用户返回支付结果,同时需要通知商户:用户已完成支付),这时就需要异步调用(不用等待通知完商户才给用户返回支付结果)。 |
| 9 | +## 实现 |
| 10 | +1.在启动类上添加@EnableAsync注解,表示项目支持异步方法调用; |
| 11 | + |
| 12 | +``` |
| 13 | +@SpringBootApplication |
| 14 | +@EnableAsync |
| 15 | +public class SpringbootAsyncMethodsApplication { |
| 16 | +
|
| 17 | + public static void main(String[] args) { |
| 18 | + SpringApplication.run(SpringbootAsyncMethodsApplication.class, args); |
| 19 | + } |
| 20 | +
|
| 21 | +
|
| 22 | +} |
| 23 | +``` |
| 24 | + |
| 25 | +2.在需要异步调用的方法上添加@Async注解,表示该方法为异步方法,即该方法和调用者不在一个线程中进行。 |
| 26 | + |
| 27 | +示例代码: |
| 28 | + |
| 29 | +``` |
| 30 | + @Async |
| 31 | + public Future<String> findByUrl(String url) throws InterruptedException { |
| 32 | + log.info("Looking up " + url); |
| 33 | + String results = netPageDownService.findByUrl(url); |
| 34 | + // Artificial delay of 1s for demonstration purposes |
| 35 | + Thread.sleep(1000L); |
| 36 | + log.info("当前线程{}", Thread.currentThread().getName()); |
| 37 | + return new AsyncResult<>(results); |
| 38 | + } |
| 39 | +``` |
| 40 | + |
| 41 | + |
| 42 | +## 配置详解 |
| 43 | +### 配置并开启@Async扫描支持 |
| 44 | + |
| 45 | +让我们开始使用JAVA的注解配置开启异步处理机制,只需要简单的加上@EnableAsync注解到配置类上即可。 |
| 46 | +* **annotation** - 默认情况下, @EnableAsync 会扫描使用了Spring @Async与EJB 3.1 javax.ejb.Asynchronous的方法;此选项也可以用来扫描其他的,如用户自定义的注解类型; |
| 47 | +* **mode** - 指定应该使用哪种AOP进行切面处理 - JAVA代理或AspectJ; |
| 48 | +* **proxyTargetClass** - 指定应该使用哪种代理类 - CGLIB或JDK;此属性只有当**mode**设置成**AdviceMode.PROXY**才会产生效果。 |
| 49 | +* **order** - 设置AsyncAnnotationBeanPostProcessor执行顺序(生命周期有关);默认情况下会最后一个执行,所以这样就能顾及到所有已存在的代理。 |
| 50 | + |
| 51 | +### 异步方法加@Async |
| 52 | + |
| 53 | +首先 - 让我们来了解一些规则 - @Async有两点局限性(无法正常工作)。 |
| 54 | +* 方法名必须是public进行修饰的 |
| 55 | +* 必须不能在同一个类中调用异步方法 |
| 56 | +原因很简单 - 方法名必须用public修饰才能被进行代理;而同一个类中调用方法的话会略过代理进行直接调用。 |
| 57 | + |
| 58 | +spring通过@Async定义异步任务 |
| 59 | + |
| 60 | +异步的方法有2种 |
| 61 | +1. 最简单的异步调用,返回值为void |
| 62 | + |
| 63 | +``` |
| 64 | +@Async |
| 65 | +public void asyncMethodWithVoidReturnType() { |
| 66 | + System.out.println("Execute method asynchronously. " |
| 67 | + + Thread.currentThread().getName()); |
| 68 | +} |
| 69 | +``` |
| 70 | + |
| 71 | +2. 异常调用返回Future |
| 72 | + |
| 73 | +``` |
| 74 | +@Async |
| 75 | +public Future<String> asyncMethodWithReturnType() { |
| 76 | + System.out.println("Execute method asynchronously - " |
| 77 | + + Thread.currentThread().getName()); |
| 78 | + try { |
| 79 | + Thread.sleep(5000); |
| 80 | + return new AsyncResult<String>("hello world !!!!"); |
| 81 | + } catch (InterruptedException e) { |
| 82 | + // |
| 83 | + } |
| 84 | +
|
| 85 | + return null; |
| 86 | +} |
| 87 | +``` |
| 88 | +Spring 同样也提供了一个Future的实现类叫AsyncResult,此类可以用来跟踪异步方法调用结果。 |
| 89 | +现在,让我们来调用上面的方法并通过Future进行获取到异步处理的结果。 |
| 90 | + |
| 91 | +``` |
| 92 | +public void testAsyncAnnotationForMethodsWithReturnType() |
| 93 | + throws InterruptedException, ExecutionException { |
| 94 | + System.out.println("Invoking an asynchronous method. " |
| 95 | + + Thread.currentThread().getName()); |
| 96 | + Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType(); |
| 97 | +
|
| 98 | + while (true) { |
| 99 | + if (future.isDone()) { |
| 100 | + System.out.println("Result from asynchronous process - " + future.get()); |
| 101 | + break; |
| 102 | + } |
| 103 | + System.out.println("Continue doing something else. "); |
| 104 | + Thread.sleep(1000); |
| 105 | + } |
| 106 | +} |
| 107 | +``` |
| 108 | + |
| 109 | + |
| 110 | +## 自定义配置 |
| 111 | + |
| 112 | +Spring异步线程池的接口类,其实质是java.util.concurrent.Executor |
| 113 | + |
| 114 | +Executor 的实现非常多,Spring 已经实现的异常线程池: |
| 115 | +1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。 |
| 116 | +2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方 |
| 117 | +3. ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类 |
| 118 | +4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类 |
| 119 | +5. ThreadPoolTaskExecutor :最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装 |
| 120 | + |
| 121 | +默认情况下,Spring使用SimpleAsyncTaskExecutor来运行这些异步方法,默认的设置方式可以在两个层级上面进行覆盖 - 在应用全局配置上或在单独的方法上。 |
| 122 | +在配置类中配置所需的executor: |
| 123 | + |
| 124 | +``` |
| 125 | +@Configuration |
| 126 | +@EnableAsync |
| 127 | +public class SpringAsyncConfig { |
| 128 | +
|
| 129 | + @Bean(name = "threadPoolTaskExecutor") |
| 130 | + public Executor threadPoolTaskExecutor() { |
| 131 | + return new ThreadPoolTaskExecutor(); |
| 132 | + } |
| 133 | +} |
| 134 | +``` |
| 135 | +然后,在@Async注解属性中使用executor的名称: |
| 136 | + |
| 137 | +``` |
| 138 | +@Async("threadPoolTaskExecutor") |
| 139 | +public void asyncMethodWithConfiguredExecutor() { |
| 140 | + System.out.println("Execute method with configured executor - " |
| 141 | + + Thread.currentThread().getName()); |
| 142 | +} |
| 143 | +``` |
| 144 | + |
| 145 | +### 应用全局配置上覆盖Executor |
| 146 | + |
| 147 | +自定义配置可以实现AsyncConfigurer 接口或继承AsyncConfigurerSupport,覆盖默认配置。 |
| 148 | + |
| 149 | +实现AsyncConfigurer接口 - 意思是getAsyncExecutor方法需要我们自己来进行实现,会返回Executor给整个应用实例使用 - 意味着现在充当默认的Executor去运行加了@Async注解的方法。 |
| 150 | + |
| 151 | +``` |
| 152 | +@Configuration |
| 153 | +@EnableAsync |
| 154 | +public class SpringAsyncConfig implements AsyncConfigurer { |
| 155 | +
|
| 156 | + @Override |
| 157 | + public Executor getAsyncExecutor() { |
| 158 | + return new ThreadPoolTaskExecutor(); |
| 159 | + } |
| 160 | +
|
| 161 | +} |
| 162 | +``` |
| 163 | + |
| 164 | + |
| 165 | + |
| 166 | + |
| 167 | +# 异常处理 |
| 168 | + |
| 169 | +由于返回值类型是Future,异常处理就简单了 - Future.get()会抛出异常。 |
| 170 | +但是,如果返回类型是void,异常将无法正常传送到调用的线程. 因此,我们需要添加一些额外的配置来处理异常。 |
| 171 | +我们创建一个实现了AsyncUncaughtExceptionHandler接口的自定义异步异常处理类.一旦任意未捕获的异常产生后都会调用handleUncaughtException()方法。 |
| 172 | + |
| 173 | +``` |
| 174 | +public class CustomAsyncExceptionHandler |
| 175 | + implements AsyncUncaughtExceptionHandler { |
| 176 | +
|
| 177 | + @Override |
| 178 | + public void handleUncaughtException( |
| 179 | + Throwable throwable, Method method, Object... obj) { |
| 180 | +
|
| 181 | + System.out.println("Exception message - " + throwable.getMessage()); |
| 182 | + System.out.println("Method name - " + method.getName()); |
| 183 | + for (Object param : obj) { |
| 184 | + System.out.println("Parameter value - " + param); |
| 185 | + } |
| 186 | + } |
| 187 | +
|
| 188 | +} |
| 189 | +``` |
| 190 | +在上一个代码段中,我们看到配置类实现了AsyncConfigurer接口.根据其中的部分,我们同样也需要实现getAsyncUncaughtExceptionHandler()方法来自定义我们的异步异常处理类: |
| 191 | + |
| 192 | +``` |
| 193 | +@Override |
| 194 | +public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { |
| 195 | + return new CustomAsyncExceptionHandler(); |
| 196 | +} |
| 197 | +``` |
| 198 | +## 多线程池 |
| 199 | + |
| 200 | +``` |
| 201 | + @Bean(name = "threadPoolTaskExecutor1") |
| 202 | + public Executor threadPoolTaskExecutor1() { |
| 203 | + log.info("加载{}配置;MultThreadPoolConfig.threadPoolTaskExecutor1"); |
| 204 | + ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); |
| 205 | + threadPoolTaskExecutor.setThreadNamePrefix("threadPoolTaskExecutor1"); |
| 206 | + return threadPoolTaskExecutor; |
| 207 | + } |
| 208 | +
|
| 209 | + @Bean(name = "threadPoolTaskExecutor2") |
| 210 | + public Executor threadPoolTaskExecutor2() { |
| 211 | + log.info("加载{}配置;MultThreadPoolConfig.threadPoolTaskExecutor2"); |
| 212 | + ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); |
| 213 | + threadPoolTaskExecutor.setThreadNamePrefix("threadPoolTaskExecutor2"); |
| 214 | + return threadPoolTaskExecutor; |
| 215 | + } |
| 216 | +``` |
| 217 | +使用的时候表明使用哪个线程池 |
| 218 | + |
| 219 | +``` |
| 220 | +@Async("threadPoolTaskExecutor1") |
| 221 | + public Future<String> findByUrl(String url) throws InterruptedException { |
| 222 | + log.info("Looking up " + url); |
| 223 | + String results = netPageDownService.findByUrl(url); |
| 224 | + // Artificial delay of 1s for demonstration purposes |
| 225 | + Thread.sleep(1000L); |
| 226 | + log.info("当前线程{}", Thread.currentThread().getName()); |
| 227 | + return new AsyncResult<>(results); |
| 228 | + } |
| 229 | +``` |
| 230 | + |
| 231 | +代码地址参见:https://github.com/mongoding/spring-boot-side |
| 232 | + |
| 233 | + |
| 234 | + |
0 commit comments