使用@Async实现异步调用

什么是“异步调用”?“异步调用”对应的是“同步调用”,同步调用指程序按照定义顺序依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行;异步调用指程序在顺序执行时,不等待异步调用的语句返回结果就执行后面的程序。

同步调用

下面通过一个简单示例来直观的理解什么是同步调用:

定义Task类,创建三个处理函数分别模拟三个执行任务的操作,操作消耗时间随机取(10秒内)

@Slf4j
@Component
public class AsyncTasks {

    public static Random random = new Random();

    public void doTaskOne() throws Exception {
        log.info("开始做任务一");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务一,耗时:" + (end - start) + "毫秒");
    }

    public void doTaskTwo() throws Exception {
        log.info("开始做任务二");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务二,耗时:" + (end - start) + "毫秒");
    }

    public void doTaskThree() throws Exception {
        log.info("开始做任务三");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务三,耗时:" + (end - start) + "毫秒");
    }

}

在单元测试用例中,注入Task对象,并在测试用例中执行doTaskOne、doTaskTwo、doTaskThree三个函数。

@Slf4j
@SpringBootTest
public class ApplicationTests {

    @Autowired
    private AsyncTasks asyncTasks;

    @Test
    public void test() throws Exception {
        asyncTasks.doTaskOne();
        asyncTasks.doTaskTwo();
        asyncTasks.doTaskThree();
    }

}

执行单元测试,可以看到类似如下输出:

2021-09-14 23:19:12.922  INFO 92539 --- [           main]        : 开始做任务一
2021-09-14 23:19:17.788  INFO 92539 --- [           main]        : 完成任务一,耗时:4865毫秒
2021-09-14 23:19:17.788  INFO 92539 --- [           main]        : 开始做任务二
2021-09-14 23:19:24.851  INFO 92539 --- [           main]        : 完成任务二,耗时:7063毫秒
2021-09-14 23:19:24.851  INFO 92539 --- [           main]        : 开始做任务三
2021-09-14 23:19:26.928  INFO 92539 --- [           main]        : 完成任务三,耗时:2076毫秒

任务一、任务二、任务三顺序的执行完了,换言之doTaskOne、doTaskTwo、doTaskThree三个函数顺序的执行完成。

异步调用

上述的同步调用虽然顺利的执行完了三个任务,但是可以看到执行时间比较长,若这三个任务本身之间不存在依赖关系,可以并发执行的话,同步调用在执行效率方面就比较差,可以考虑通过异步调用的方式来并发执行。
在Spring Boot中,我们只需要通过使用@Async注解就能简单的将原来的同步函数变为异步函数,Task类改在为如下模式:

@Slf4j
@Component
public class AsyncTasks {

    public static Random random = new Random();

    @Async
    public void doTaskOne() throws Exception {
        log.info("开始做任务一");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务一,耗时:" + (end - start) + "毫秒");
    }

    @Async
    public void doTaskTwo() throws Exception {
        log.info("开始做任务二");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务二,耗时:" + (end - start) + "毫秒");
    }

    @Async
    public void doTaskThree() throws Exception {
        log.info("开始做任务三");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务三,耗时:" + (end - start) + "毫秒");
    }

}

为了让@Async注解能够生效,还需要在Spring Boot的主程序中配置@EnableAsync,如下所示:

@EnableAsync
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

此时可以反复执行单元测试,您可能会遇到各种不同的结果,比如:

  • 没有任何任务相关的输出
  • 有部分任务相关的输出
  • 乱序的任务相关的输出
  • 原因是目前doTaskOne、doTaskTwo、doTaskThree三个函数的时候已经是异步执行了。主程序在异步调用之后,主程序并不会理会这三个函数是否执行完成了,由于没有其他需要执行的内容,所以程序就自动结束了,导致了不完整或是没有输出任务相关内容的情况。

注:@Async所修饰的函数不要定义为static类型,这样异步调用不会生效

异步回调

为了让doTaskOne、doTaskTwo、doTaskThree能正常结束,假设我们需要统计一下三个任务并发执行共耗时多少,这就需要等到上述三个函数都完成调动之后记录时间,并计算结果。

那么我们如何判断上述三个异步调用是否已经执行完成呢?我们需要使用CompletableFuture来返回异步调用的结果,就像如下方式改造doTaskOne函数:

    @Async
public CompletableFuture<String> doTaskOne() throws Exception {
    log.info("开始做任务一");
    long start = System.currentTimeMillis();
    Thread.sleep(random.nextInt(10000));
    long end = System.currentTimeMillis();
    log.info("完成任务一,耗时:" + (end - start) + "毫秒");
    return CompletableFuture.completedFuture("任务一完成");
}

按照如上方式改造一下其他两个异步函数之后,下面我们改造一下测试用例,让测试在等待完成三个异步调用之后来做一些其他事情。

@Test
public void test() throws Exception {
    long start = System.currentTimeMillis();

    CompletableFuture<String> task1 = asyncTasks.doTaskOne();
    CompletableFuture<String> task2 = asyncTasks.doTaskTwo();
    CompletableFuture<String> task3 = asyncTasks.doTaskThree();

    CompletableFuture.allOf(task1, task2, task3).join();

    long end = System.currentTimeMillis();

    log.info("任务全部完成,总耗时:" + (end - start) + "毫秒");
}

看看我们做了哪些改变:

  • 在测试用例一开始记录开始时间
  • 在调用三个异步函数的时候,返回CompletableFuture类型的结果对象
  • 通过CompletableFuture.allOf(task1, task2, task3).join()实现三个异步任务都结束之前的阻塞效果
  • 三个任务都完成之后,根据结束时间 - 开始时间,计算出三个任务并发执行的总耗时。
    执行一下上述的单元测试,可以看到如下结果:
2021-09-14 23:33:38.842  INFO 95891 --- [         task-3]        : 开始做任务三
2021-09-14 23:33:38.842  INFO 95891 --- [         task-2]        : 开始做任务二
2021-09-14 23:33:38.842  INFO 95891 --- [         task-1]        : 开始做任务一
2021-09-14 23:33:45.155  INFO 95891 --- [         task-2]        : 完成任务二,耗时:6312毫秒
2021-09-14 23:33:47.308  INFO 95891 --- [         task-3]        : 完成任务三,耗时:8465毫秒
2021-09-14 23:33:47.403  INFO 95891 --- [         task-1]        : 完成任务一,耗时:8560毫秒
2021-09-14 23:33:47.404  INFO 95891 --- [           main]        : 任务全部完成,总耗时:8590毫秒

可以看到,通过异步调用,让任务一、二、三并发执行,有效的减少了程序的总运行时间。

配置@Async异步任务的线程池

虽然,从单次接口调用来说,是没有问题的。但当接口被客户端频繁调用的时候,异步任务的数量就会大量增长:3 x n(n为请求数量),如果任务处理不够快,就很可能会出现内存溢出的情况。那么为什么会内存溢出呢?根本原因是由于Spring Boot默认用于异步任务的线程池是这样配置的:

image.png
图中我标出的两个重要参数是需要关注的:

  • queueCapacity:缓冲队列的容量,默认为INT的最大值(2的31次方-1)。
  • maxSize:允许的最大线程数,默认为INT的最大值(2的31次方-1)。

所以,默认情况下,一般任务队列就可能把内存给堆满了。所以,我们真正使用的时候,还需要对异步任务的执行线程池做一些基础配置,以防止出现内存溢出导致服务不可用的问题。

配置默认线程池

默认线程池的配置很简单,只需要在配置文件中完成即可,主要有以下这些参数:

//线程池创建时的初始化线程数,默认为8
spring.task.execution.pool.core-size=2
//线程池的最大线程数,默认为int最大值
spring.task.execution.pool.max-size=5
//用来缓冲执行任务的队列,默认为int最大值
spring.task.execution.pool.queue-capacity=10
//线程终止前允许保持空闲的时间
spring.task.execution.pool.keep-alive=60s
//是否允许核心线程超时
spring.task.execution.pool.allow-core-thread-timeout=true
//是否等待剩余任务完成后才关闭应用
spring.task.execution.shutdown.await-termination=false
//等待剩余任务完成的最大时间
spring.task.execution.shutdown.await-termination-period=
//线程名的前缀,设置好了之后可以方便我们在日志中查看处理任务所在的线程池
spring.task.execution.thread-name-prefix=task-

增加线程池配置后的日志输出的顺序会变成如下的顺序:

2021-09-15 16:31:50.013  INFO 77985 --- [         task-1]        : 开始做任务一
2021-09-15 16:31:50.013  INFO 77985 --- [         task-2]        : 开始做任务二
2021-09-15 16:31:52.452  INFO 77985 --- [         task-1]        : 完成任务一,耗时:2439毫秒
2021-09-15 16:31:52.452  INFO 77985 --- [         task-1]        : 开始做任务三
2021-09-15 16:31:55.880  INFO 77985 --- [         task-2]        : 完成任务二,耗时:5867毫秒
2021-09-15 16:32:00.346  INFO 77985 --- [         task-1]        : 完成任务三,耗时:7894毫秒
2021-09-15 16:32:00.347  INFO 77985 --- [           main]        : 任务全部完成,总耗时:10363毫秒

任务一和任务二会马上占用核心线程,任务三进入队列等待
任务一完成,释放出一个核心线程,任务三从队列中移出,并占用核心线程开始处理

注意:这里可能有的小伙伴会问,最大线程不是5么,为什么任务三是进缓冲队列,不是创建新线程来处理吗?这里要理解缓冲队列与最大线程间的关系:只有在缓冲队列满了之后才会申请超过核心线程数的线程来进行处理。所以,这里只有缓冲队列中10个任务满了,再来第11个任务的时候,才会在线程池中创建第三个线程来处理。这个这里就不具体写列子了,读者可以自己调整下参数,或者调整下单元测试来验证这个逻辑。

Q.E.D.


知道的越多,不知道的越多