CompletableFuture可以取代你的future任务吗?

背景

历来互联网多线程、高并发场景多用于快速响应的服务接口,但近两年AI兴起之际,大家对于接口服务的响应时间的容忍度逐渐升高,也更多得使用多线程去处理一些长耗时的任务。进而导致了一些被忽略的细节而引发了线程池的问题。今天就来聊一聊CompletableFuture的线程取消问题。

鉴于公司线上业务有些敏感性,故抽样脱敏了部分代码,将所遇问题做个介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import lombok.extern.slf4j.Slf4j;  
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ConcurrentTreadServiceImpl {

private ThreadPoolTaskExecutor threadPool;

{
threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(200);
threadPool.setMaxPoolSize(200);
threadPool.setKeepAliveSeconds(10);
threadPool.setThreadNamePrefix("thread-");
threadPool.setQueueCapacity(10000);
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

threadPool.initialize();
}

public String queryListCommandFuction(List<String> list){
CompletableFuture<String> userReTrievefuture = CompletableFuture.supplyAsync(()->{
// todo some time-consuming operation
try {
Thread.sleep(3_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("模拟耗时操作");
return "success";
},threadPool);

CompletableFuture<String> infoReTrievefuture = CompletableFuture.supplyAsync(()->{
// todo some time-consuming operation
try {
Thread.sleep(3_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("模拟耗时操作");
return "success";
},threadPool);

CompletableFuture<Void> allFuture = CompletableFuture.allOf(
userReTrievefuture,
infoReTrievefuture
);
try {
allFuture.get(3_000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("timeout ! thread cancel",e);
allFuture.cancel(true);
return "error";
}

// todo some Combination operation
String userJoin = userReTrievefuture.join();
String infoJoin = infoReTrievefuture.join();

return userJoin + "_" + infoJoin;
}
}

代码如上,乍看之下,觉得并无问题,但是线上确实发生了线程池耗尽的问题。

1
2
3
4
5
 org.springframework.core.task.TaskRejectedException: Executor [org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor$1@36711c85[Running, pool size = 200, active threads = 200, queued tasks = 10000, completed tasks = 15692]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@2dc9cfe0

at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:317)

at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1618)

而这引出了一个不为人注意的CompletableFuture的细节特性问题。

问题还原

这边我准备了两个测试case,大家可以试跑一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import org.junit.jupiter.api.Test;  
import java.util.concurrent.*;

public class CompletableFutureFunctionTest {

@Test
public void asyncTaskCanInterrupted() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(100);

Future<String> future = executorService.submit(() -> interruptableTask());

try {
String result = future.get(500, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
future.cancel(true);
System.out.println("任务超时已取消");
} catch (ExecutionException e){
e.printStackTrace();
}
while (true) {
Integer res = (100/10);
}
}

@Test
public void asyncTaskNoInterrupted() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> interruptableTask(), executorService);

try {
String result = future.get(500,TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
System.out.println("任务执行异常,取消任务");
future.cancel(true);
}

while (true) {
Integer res = (100/10);
}
}

public String interruptableTask() {
try {
for (int i = 0; i < 100; i++) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("任务被中断");
return "Interrupted";
}
System.out.println("执行任务"+i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
System.out.println("捕获到中断异常");
Thread.currentThread().interrupt();
return "Interrupted";
}
return "Completed";
}
}

其中使用future的方法执行日志为

1
2
3
4
5
6
7
执行任务0
执行任务1
执行任务2
执行任务3
执行任务4
捕获到中断异常
任务超时已取消

使用CompletableFuture的方法执行日志为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
执行任务0
执行任务1
执行任务2
执行任务3
执行任务4
任务执行异常,取消任务
执行任务5
执行任务6
执行任务7
执行任务8
执行任务9
执行任务10
执行任务11
执行任务12
执行任务13
执行任务14
执行任务15
执行任务16
执行任务17
执行任务18
执行任务19
执行任务20
执行任务21
执行任务22
执行任务23
执行任务24
执行任务25
执行任务26
……

CompletableFuture的日志是否符合你的预期?为什么CompletableFuture中使用的线程并没有真正被中断,而是仍然不断地在打印日志。

大模型答疑

带着问题提供给了现在主流的大模型,显然大模型并没有解决我的问题,只能再扒一扒源码了。
image.png

分析问题、扒一扒源码

从Future任务和CompletableFuture任务的不同日志分析,我们得看看Future的任务和CompletableFuture的任务都是如何提交的?如何取消的?线程池对于他们提交的任务是如何管理的?
首先第一个问题,可以看到Future和CompletableFuture都是使用的线程池提交任务。
Future提交的的方式在上面示例代码中即可见,采用的是 executorService.submit方法
CompletableFuture 的supplyAsync方法提交任务的源码可见,采用的是executorService.execute方法

1
2
3
4
5
6
7
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,  
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}

那么future和CompletableFuture的cancel是怎么处理的呢?
CompletableFuture的cancel方法

1
2
3
4
5
6
7
8
9
10
11
12
13
/**  
* If not already completed, completes this CompletableFuture with * a {@link CancellationException}. Dependent CompletableFutures
* that have not already completed will also complete * exceptionally, with a {@link CompletionException} caused by
* this {@code CancellationException}.
* * @param mayInterruptIfRunning this value has no effect in this
* implementation because interrupts are not used to control * processing. * * @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = (result == null) &&
internalComplete(new AltResult(new CancellationException()));
postComplete();
return cancelled || isCancelled();
}

FutureTask的cancel方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean cancel(boolean mayInterruptIfRunning) {  
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

到这其实在CompletableFuture的注释上其实已经做了说明
@param mayInterruptIfRunning this value has no effect in this

但是咱们继续看,针对以上以上我们引入第三个问题,线程池是如何管理他们提交的任务的。由上文第一个问题可知,FutureTask采用的是executor提供的submit方法,CompletableFuture采用的是execute方法
对比一下两者
execute(Runnable command)

  • 只接受 Runnable,没有返回值
  • 只是调用线程池的 workQueue.offer(new Task(command))
  • 不会封装为 FutureFutureTask
  • 所以你无法拿到任务引用 → 自然也无法中断或取消
    submit(Callable/Runnable)
  • 会把你提交的任务封装成 FutureTask
  • FutureTaskRunnableFuture,本身实现了:
    • Runnable(可提交给线程池)
    • Future(可以取消、获取结果)
  • 所以你能得到:
    • future.get() 获取结果
    • future.cancel(true) 中断任务
    • future.isDone() / isCancelled()

综上,基本讲清楚了为什么CompletableFuture无法取消线程池中的任务(即使任务设置了接受中断),因为CompletableFuture底层采用的executor.execute()方法,它无法拿到任务引用,所以自然也就无法中断或取消。所以你还在你的代码里盲目地使用CompletableFuture替代Future吗?

继续有个更深的问题,其实可以留给大家继续思考,为什么CompletableFuture没有设计成用submit方法来提交任务?

参考文献

http://ifeve.com/completablefuture/