java CompletableFuture可以取代你的future任务吗? elkins 2025-05-24 2025-05-24 背景 历来互联网多线程、高并发场景多用于快速响应的服务接口,但近两年AI兴起之际,大家对于接口服务的响应时间的容忍度逐渐升高,也更多得使用多线程去处理一些长耗时的任务。进而导致了一些被忽略的细节而引发了线程池的问题。今天就来聊一聊CompletableFuture的线程取消问题。
鉴于公司线上业务有些敏感性,故抽样脱敏了部分代码,将所遇问题做个介绍。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j public class ConcurrentTreadServiceImpl { private ThreadPoolTaskExecutor threadPool; { threadPool = new ThreadPoolTaskExecutor (); threadPool.setCorePoolSize(200 ); threadPool.setMaxPoolSize(200 ); threadPool.setKeepAliveSeconds(10 ); threadPool.setThreadNamePrefix("thread-" ); threadPool.setQueueCapacity(10000 ); threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor .CallerRunsPolicy()); threadPool.initialize(); } public String queryListCommandFuction (List<String> list) { CompletableFuture<String> userReTrievefuture = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(3_000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } log.info("模拟耗时操作" ); return "success" ; },threadPool); CompletableFuture<String> infoReTrievefuture = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(3_000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } log.info("模拟耗时操作" ); return "success" ; },threadPool); CompletableFuture<Void> allFuture = CompletableFuture.allOf( userReTrievefuture, infoReTrievefuture ); try { allFuture.get(3_000 , TimeUnit.MILLISECONDS); } catch (Exception e) { log.error("timeout ! thread cancel" ,e); allFuture.cancel(true ); return "error" ; } String userJoin = userReTrievefuture.join(); String infoJoin = infoReTrievefuture.join(); return userJoin + "_" + infoJoin; } }
代码如上,乍看之下,觉得并无问题,但是线上确实发生了线程池耗尽的问题。
1 2 3 4 5 org.springframework.core.task.TaskRejectedException: Executor [org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor$1@36711c85[Running, pool size = 200, active threads = 200, queued tasks = 10000, completed tasks = 15692]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@2dc9cfe0 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:317) at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1618)
而这引出了一个不为人注意的CompletableFuture的细节特性问题。
问题还原 这边我准备了两个测试case,大家可以试跑一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import org.junit.jupiter.api.Test; import java.util.concurrent.*; public class CompletableFutureFunctionTest { @Test public void asyncTaskCanInterrupted () throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(100 ); Future<String> future = executorService.submit(() -> interruptableTask()); try { String result = future.get(500 , TimeUnit.MILLISECONDS); } catch (TimeoutException e) { future.cancel(true ); System.out.println("任务超时已取消" ); } catch (ExecutionException e){ e.printStackTrace(); } while (true ) { Integer res = (100 /10 ); } } @Test public void asyncTaskNoInterrupted () throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(100 ); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> interruptableTask(), executorService); try { String result = future.get(500 ,TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { System.out.println("任务执行异常,取消任务" ); future.cancel(true ); } while (true ) { Integer res = (100 /10 ); } } public String interruptableTask () { try { for (int i = 0 ; i < 100 ; i++) { if (Thread.currentThread().isInterrupted()) { System.out.println("任务被中断" ); return "Interrupted" ; } System.out.println("执行任务" +i); Thread.sleep(100 ); } } catch (InterruptedException e) { System.out.println("捕获到中断异常" ); Thread.currentThread().interrupt(); return "Interrupted" ; } return "Completed" ; } }
其中使用future的方法执行日志为
1 2 3 4 5 6 7 执行任务0 执行任务1 执行任务2 执行任务3 执行任务4 捕获到中断异常 任务超时已取消
使用CompletableFuture的方法执行日志为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 执行任务0 执行任务1 执行任务2 执行任务3 执行任务4 任务执行异常,取消任务 执行任务5 执行任务6 执行任务7 执行任务8 执行任务9 执行任务10 执行任务11 执行任务12 执行任务13 执行任务14 执行任务15 执行任务16 执行任务17 执行任务18 执行任务19 执行任务20 执行任务21 执行任务22 执行任务23 执行任务24 执行任务25 执行任务26 ……
CompletableFuture的日志是否符合你的预期?为什么CompletableFuture中使用的线程并没有真正被中断,而是仍然不断地在打印日志。
大模型答疑 带着问题提供给了现在主流的大模型,显然大模型并没有解决我的问题,只能再扒一扒源码了。
分析问题、扒一扒源码 从Future任务和CompletableFuture任务的不同日志分析,我们得看看Future的任务和CompletableFuture的任务都是如何提交的?如何取消的?线程池对于他们提交的任务是如何管理的? 首先第一个问题,可以看到Future和CompletableFuture都是使用的线程池提交任务。 Future提交的的方式在上面示例代码中即可见,采用的是 executorService.submit方法 CompletableFuture 的supplyAsync方法提交任务的源码可见,采用的是executorService.execute方法
1 2 3 4 5 6 7 static <U> CompletableFuture<U> asyncSupplyStage (Executor e, Supplier<U> f) { if (f == null ) throw new NullPointerException (); CompletableFuture<U> d = new CompletableFuture <U>(); e.execute(new AsyncSupply <U>(d, f)); return d; }
那么future和CompletableFuture的cancel是怎么处理的呢? CompletableFuture的cancel方法
1 2 3 4 5 6 7 8 9 10 11 12 13 public boolean cancel (boolean mayInterruptIfRunning) { boolean cancelled = (result == null ) && internalComplete(new AltResult (new CancellationException ())); postComplete(); return cancelled || isCancelled(); }
FutureTask的cancel方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public boolean cancel (boolean mayInterruptIfRunning) { if (!(state == NEW && STATE.compareAndSet (this , NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false ; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null ) t.interrupt(); } finally { STATE.setRelease(this , INTERRUPTED); } } } finally { finishCompletion(); } return true ; }
到这其实在CompletableFuture的注释上其实已经做了说明 @param mayInterruptIfRunning this value has no effect in this
但是咱们继续看,针对以上以上我们引入第三个问题,线程池是如何管理他们提交的任务的。由上文第一个问题可知,FutureTask采用的是executor提供的submit方法,CompletableFuture采用的是execute方法 对比一下两者 execute(Runnable command)
只接受 Runnable,没有返回值
只是调用线程池的 workQueue.offer(new Task(command))
不会封装为 Future 或 FutureTask
所以你无法拿到任务引用 → 自然也无法中断或取消 submit(Callable/Runnable)
会把你提交的任务封装成 FutureTask
FutureTask 是 RunnableFuture,本身实现了:
Runnable(可提交给线程池)
Future(可以取消、获取结果)
所以你能得到:
future.get() 获取结果
future.cancel(true) 中断任务
future.isDone() / isCancelled()
综上,基本讲清楚了为什么CompletableFuture无法取消线程池中的任务(即使任务设置了接受中断),因为CompletableFuture底层采用的executor.execute()方法,它无法拿到任务引用,所以自然也就无法中断或取消。所以你还在你的代码里盲目地使用CompletableFuture替代Future吗?
继续有个更深的问题,其实可以留给大家继续思考,为什么CompletableFuture没有设计成用submit方法来提交任务?
参考文献 http://ifeve.com/completablefuture/