Java并发编程:CompletableFuture实战
Java并发编程CompletableFuture实战引言Java 8引入的CompletableFuture是现代异步编程的重要工具它不仅解决了Future的局限性还提供了丰富的API用于组合、转换和处理异步结果。相比传统的FutureCompletableFuture支持流式调用、回调函数和声明式的异常处理大大简化了异步代码的编写。Spring Boot和各类Web框架都广泛使用CompletableFuture来处理异步任务。本文将全面介绍CompletableFuture的使用方法、实战技巧和最佳实践。一、Future的局限性1.1 传统Future的问题JDK 5引入的Future提供了一种获取异步任务结果的方式但它存在明显的局限性无法手动完成当异步任务完成后无法主动通知等待的线程不支持链式调用多个依赖的异步任务无法优雅地串联执行不支持组合操作对多个Future进行合并操作需要复杂的编码异常处理困难Future.get()会将异常封装在ExecutionException中处理起来不够直观。1.2 CompletableFuture的优势CompletableFuture完美解决了这些问题。它支持函数式编程风格可以链式调用thenApply、thenCompose等方法内置丰富的组合操作如thenCombine、allOf、anyOf等支持回调模式无需阻塞等待结果提供完善的异常处理机制通过exceptionally、handle等方法优雅处理异常。二、创建CompletableFuture2.1 基本创建方式public class CompletableFutureCreation { // 1. 使用completedFuture创建已完成的Future public CompletableFutureString createCompleted() { return CompletableFuture.completedFuture(预定义结果); } // 2. 使用supplyAsync创建异步任务 public CompletableFutureString createAsync() { return CompletableFuture.supplyAsync(() - { // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return 异步计算结果; }); } // 3. 使用runAsync执行不需要返回值的任务 public CompletableFutureVoid createVoidAsync() { return CompletableFuture.runAsync(() - { System.out.println(执行异步任务线程: Thread.currentThread().getName()); }); } // 4. 指定线程池 public CompletableFutureString createWithExecutor(Executor executor) { return CompletableFuture.supplyAsync(() - { return 使用自定义线程池; }, executor); } }2.2 手动完成Futurepublic class ManualCompletion { public CompletableFutureString processWithTimeout() { CompletableFutureString future new CompletableFuture(); // 模拟异步处理 new Thread(() - { try { String result doProcess(); future.complete(result); } catch (Exception e) { future.completeExceptionally(e); } }).start(); // 设置超时 CompletableFutureString timeoutFuture future .orTimeout(5, TimeUnit.SECONDS) .whenComplete((result, ex) - { if (ex ! null) { System.out.println(处理超时或失败: ex.getMessage()); } else { System.out.println(处理完成: result); } }); return timeoutFuture; } private String doProcess() throws InterruptedException { Thread.sleep(3000); return 处理结果; } }三、转换和消费结果3.1 thenApply系列方法thenApply用于对结果进行转换处理类似Stream的map操作。public class TransformationDemo { public CompletableFutureInteger applyDemo() { return CompletableFuture.supplyAsync(() - 123) .thenApply(Integer::parseInt) // String - Integer .thenApply(n - n * 2) // Integer - Integer .thenApply(n - 结果: n); // 最终转为String } public CompletableFutureString applyWithAsync() { return CompletableFuture.supplyAsync(() - hello) .thenApplyAsync(s - { System.out.println(异步转换线程: Thread.currentThread().getName()); return s.toUpperCase(); }); } // thenApply接收Function支持返回值 // thenAccept接收Consumer只消费不返回 // thenRun接收Runnable不关心结果 public void consumeDemo() { CompletableFuture.supplyAsync(() - data) .thenAccept(result - System.out.println(消费结果: result)) .thenRun(() - System.out.println(任务完成)); } }3.2 thenCompose组合Future当异步任务返回另一个CompletableFuture时使用thenCompose进行扁平化处理。public class CompositionDemo { public CompletableFutureString composeDemo(String orderId) { return getOrderAsync(orderId) .thenCompose(order - getUserAsync(order.getUserId())) .thenApply(user - user.getName() 的订单); } private CompletableFutureOrder getOrderAsync(String orderId) { return CompletableFuture.supplyAsync(() - { // 模拟查询订单 Order order new Order(); order.setId(orderId); order.setUserId(user-001); return order; }); } private CompletableFutureUser getUserAsync(String userId) { return CompletableFuture.supplyAsync(() - { // 模拟查询用户 User user new User(); user.setId(userId); user.setName(张三); return user; }); } }3.3 thenCombine合并Future当需要合并两个独立异步任务的结果时使用thenCombine。public class CombinationDemo { public CompletableFutureString combineDemo() { CompletableFutureString future1 CompletableFuture .supplyAsync(() - Hello); CompletableFutureString future2 CompletableFuture .supplyAsync(() - World); return future1.thenCombine(future2, (s1, s2) - s1 s2); } public CompletableFutureUserOrderInfo getUserOrderInfo(String userId) { CompletableFutureUser userFuture getUserAsync(userId); CompletableFutureListOrder ordersFuture getOrdersAsync(userId); return userFuture.thenCombine(ordersFuture, (user, orders) - new UserOrderInfo(user, orders)); } private CompletableFutureUser getUserAsync(String userId) { return CompletableFuture.supplyAsync(() - { User user new User(); user.setId(userId); user.setName(张三); return user; }); } private CompletableFutureListOrder getOrdersAsync(String userId) { return CompletableFuture.supplyAsync(() - { return Arrays.asList(new Order(), new Order()); }); } }四、异常处理4.1 exceptionly处理异常public class ExceptionHandlingDemo { public CompletableFutureString handleWithExceptionally() { return CompletableFuture.supplyAsync(() - { if (true) throw new RuntimeException(测试异常); return 正常结果; }) .exceptionally(ex - { System.out.println(捕获异常: ex.getMessage()); return 默认值; }); } // 无论是否异常都会执行 public CompletableFutureString handleWithHandle(String input) { return CompletableFuture.supplyAsync(() - { if (error.equals(input)) { throw new RuntimeException(业务异常); } return 成功: input; }) .handle((result, ex) - { if (ex ! null) { System.out.println(发生异常已恢复); return 恢复后的值; } return result; }); } // 只处理异常不影响结果传递 public CompletableFutureString handleWithWhenComplete(String input) { return CompletableFuture.supplyAsync(() - { if (error.equals(input)) { throw new RuntimeException(测试异常); } return 成功: input; }) .whenComplete((result, ex) - { if (ex ! null) { System.out.println(记录异常日志: ex.getMessage()); } else { System.out.println(记录成功日志: result); } }); } }4.2 异常传播和抑制public class ExceptionPropagationDemo { public CompletableFutureString exceptionPropagationDemo() { return CompletableFuture .supplyAsync(() - { throw new BusinessException(初始异常); }) .thenApply(result - result.toUpperCase()) // 不会执行 .thenCompose(result - CompletableFuture .supplyAsync(() - { throw new AnotherException(后续异常); }) ) .exceptionally(ex - { // 这里可以访问所有异常 System.out.println(异常链: ex.getCause()); return 默认值; }); } }五、并行处理5.1 allOf等待所有任务public class ParallelProcessingDemo { public CompletableFutureString processAll(ListString ids) { ListCompletableFutureString futures ids.stream() .map(this::processOne) .collect(Collectors.toList()); return CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ) .thenApply(v - futures.stream() .map(CompletableFuture::join) .collect(Collectors.joining(, ))); } public CompletableFutureListString processAllWithResults( ListString ids) { ListCompletableFutureString futures ids.stream() .map(this::processOne) .collect(Collectors.toList()); return CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ) .thenApply(v - futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); } private CompletableFutureString processOne(String id) { return CompletableFuture.supplyAsync(() - { System.out.println(处理 id 线程: Thread.currentThread().getName()); return 结果- id; }); } }5.2 anyOf获取最先完成的结果public class FirstCompleteDemo { public CompletableFutureObject getFirstAvailable( String... services) { ListCompletableFutureString futures Arrays.stream(services) .map(this::queryService) .collect(Collectors.toList()); CompletableFutureObject anyFuture CompletableFuture .anyOf(futures.toArray(new CompletableFuture[0])); return anyFuture; } private CompletableFutureString queryService(String serviceName) { return CompletableFuture.supplyAsync(() - { // 模拟服务查询不同服务响应时间不同 int delay new Random().nextInt(3000); try { Thread.sleep(delay); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return serviceName -响应- delay ms; }); } }六、实战应用6.1 异步HTTP调用Service public class AsyncHttpService { private final RestTemplate restTemplate; private final ExecutorService executor; public AsyncHttpService(RestTemplate restTemplate) { this.restTemplate restTemplate; this.executor Executors.newFixedThreadPool(10); } public CompletableFutureUser getUser(String userId) { return CompletableFuture.supplyAsync(() - { String url http://user-service/api/users/ userId; return restTemplate.getForObject(url, User.class); }, executor); } public CompletableFutureOrderDetail getOrderWithUser(String orderId) { return getOrderAsync(orderId) .thenCompose(order - getUser(order.getUserId()) .thenApply(user - new OrderDetail(order, user))); } public CompletableFutureListProduct getRecommendedProducts( ListString productIds) { ListCompletableFutureProduct futures productIds.stream() .map(id - CompletableFuture.supplyAsync( () - getProductFromService(id), executor)) .collect(Collectors.toList()); return CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ) .thenApply(v - futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); } private Order getOrderAsync(String orderId) { return restTemplate.getForObject( http://order-service/api/orders/ orderId, Order.class); } private Product getProductFromService(String productId) { return restTemplate.getForObject( http://product-service/api/products/ productId, Product.class); } }6.2 异步数据处理管道Service public class DataProcessingPipeline { public CompletableFutureProcessedData processDataPipeline( RawData raw) { return CompletableFuture .supplyAsync(() - validate(raw)) .thenApply(this::clean) .thenApply(this::transform) .thenApplyAsync(this::enrich) // 异步Enrichment可能较慢 .thenApply(this::aggregate) .exceptionally(ex - { log.error(数据处理失败: {}, ex.getMessage()); return createDefaultResult(raw); }); } private RawData validate(RawData raw) { if (raw null) { throw new IllegalArgumentException(原始数据不能为空); } return raw; } private CleanedData clean(RawData raw) { return new CleanedData(raw.getValue().trim()); } private TransformedData transform(CleanedData cleaned) { return new TransformedData(cleaned.getValue().toUpperCase()); } private EnrichedData enrich(TransformedData transformed) { // 添加额外的 enrichment 数据 return new EnrichedData(transformed); } private ProcessedData aggregate(EnrichedData enriched) { return new ProcessedData(enriched.getValue()); } private ProcessedData createDefaultResult(RawData raw) { return new ProcessedData(默认值); } }6.3 超时控制和重试public class TimeoutAndRetryDemo { public CompletableFutureString withTimeout( CompletableFutureString future, long timeoutSeconds) { return future.orTimeout(timeoutSeconds, TimeUnit.SECONDS) .exceptionally(ex - { System.out.println(任务超时: ex.getMessage()); return 超时默认值; }); } public CompletableFutureString withRetry( SupplierString operation, int maxRetries) { return CompletableFuture.supplyAsync(operation) .exceptionallyCompose(ex - { if (maxRetries 0) { System.out.println(重试中剩余次数: (maxRetries - 1)); return withRetry(operation, maxRetries - 1); } throw new CompletionException(ex); }); } public CompletableFutureString retryWithBackoff( SupplierString operation, int maxRetries) { return CompletableFuture.supplyAsync(operation) .exceptionallyCompose(ex - { if (maxRetries 0) { long delay (long) Math.pow(2, 5 - maxRetries) * 1000; System.out.println(等待 delay ms 后重试...); return CompletableFuture .delayedFuture(delay, TimeUnit.MILLISECONDS) .thenCompose(v - retryWithBackoff(operation, maxRetries - 1)); } throw new CompletionException(ex); }); } }七、性能优化建议7.1 使用自定义线程池默认的ForkJoinPool.commonPool()适合计算密集型任务但对于IO密集型任务建议使用自定义线程池以获得更好的性能。Configuration public class CompletableFutureConfig { Bean(completableFutureExecutor) public ExecutorService completableFutureExecutor() { return new ThreadPoolExecutor( 20, // corePoolSize 100, // maxPoolSize 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(500), new ThreadFactoryBuilder() .setNameFormat(cf-pool-%d) .build(), new ThreadPoolExecutor.CallerRunsPolicy() ); } }7.2 避免过度嵌套使用thenCompose扁平化嵌套的CompletableFuture避免回调地狱。7.3 合理使用同步方法thenApply等方法的Async版本会创建新任务开销如果操作很快完成使用同步版本更高效。总结CompletableFuture是Java异步编程的核心工具它提供了丰富的API来支持各种异步场景。通过supplyAsync、thenApply、thenCompose、thenCombine等方法可以优雅地处理异步任务的创建、转换、组合和异常。掌握CompletableFuture的使用对于构建高性能的Java应用至关重要。在实际开发中需要根据业务场景合理设计异步流程并注意线程池配置和异常处理才能真正发挥异步编程的优势。