Java 线程池(第十篇):(收官篇)CompletableFuture 异步编排实战 —— 多任务并行、结果汇总、超时控制与线程池协作

completableFuture 异步编排实战 —— 多任务并行、结果汇总、超时控制与线程池协作

如果说前 1–9 篇解决的是 “线程池如何安全、稳定地跑”
那么这一篇解决的是:

如何把多个异步任务“编排”成一个可读、可控、可维护的并发流程。

这正是现代 Java 并发从 ThreadPoolExecutor → CompletableFuture 的进化方向。

一、为什么需要 CompletableFuture?

先看一个你一定写过的代码:

Future<User> f1 = pool.submit(() -> this.loadUser()); Future<User> f2 = pool.submit(() -> this.loadUser()); User user = f1.get(); Order order = f2.get(); 

问题很明显:

  • get() 阻塞
  • 顺序代码读起来像同步
  • 异常处理零散
  • 任务依赖一多,代码迅速失控

CompletableFuture 的核心价值只有一句话:

用“声明式”的方式,描述异步任务之间的关系,而不是用 get() 等结果。

二、CompletableFuture 和线程池的关系(先搞清楚)

1️⃣ CompletableFuture ≠ 线程池

  • CompletableFuture 是 异步任务编排工具
  • 线程池是 执行引擎

2️⃣ 默认线程池是 ForkJoinPool(不推荐直接用)

CompletableFuture.supplyAsync(() -> work()); 

默认使用:

ForkJoinPool.commonPool() 

👉 生产环境强烈建议:显式传入你自己的线程池(第五篇)

CompletableFuture.supplyAsync(() -> work(), ioPool); 

三、最核心的三种编排模式(80% 场景)

1️⃣ thenApply —— 单任务链式变换

CompletableFuture .supplyAsync(() -> 1, pool) .thenApply(x -> x + 1) .thenApply(x -> x * 2) .thenAccept(System.out::println); 
  • 同一条任务链
  • 上一步完成 → 执行下一步
  • 适合 数据转换

2️⃣ thenCompose —— 依赖型异步(避免嵌套)

CompletableFuture<User> f = loadUserAsync(id) .thenCompose(user -> loadProfileAsync(user)); 

等价于(但比它优雅得多):

CompletableFuture<User> f = loadUserAsync(id) .thenApply(user -> loadProfileAsync(user)) .get(); // ❌ 

一句话:

thenCompose = “异步版的 flatMap”

3️⃣ thenCombine —— 并行任务结果合并(非常常用)

CompletableFuture<User> userFuture = loadUserAsync(id); CompletableFuture<Order> orderFuture = loadOrderAsync(id); CompletableFuture<UserInfo> result = userFuture.thenCombine(orderFuture, (user, order) -> new UserInfo(user, order)); 

✔ 并行执行
✔ 都完成后才合并
✔ 没有 get()

四、allOf / anyOf:真正的“并行编排”

allOf:全部完成

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3); all.thenRun(() -> System.out.println("all done")); 

⚠ 注意:allOf 不帮你收集结果,你需要自己 get(或 join)

List<Result> results = List.of(f1, f2, f3) .stream() .map(CompletableFuture::join) .toList(); 

anyOf:任意一个完成

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3); all.thenRun(() -> System.out.println("all done")); 

⚠ 注意:allOf 不帮你收集结果,你需要自己 get(或 join)

List<Result> results = List.of(f1, f2, f3) .stream() .map(CompletableFuture::join) .toList(); 

anyOf:任意一个完成

CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2); any.thenAccept(r -> System.out.println("first = " + r)); 

常用于:

  • 多数据源兜底
  • 多节点竞速

五、异常处理:这是 CompletableFuture 的强项

1️⃣ exceptionally —— 兜底恢复

CompletableFuture .supplyAsync(() -> risky(), pool) .exceptionally(e -> { log.error("error", e); return defaultValue; }); 

2️⃣ handle —— 成功 / 失败都处理

future.handle((r, e) -> { if (e != null) { return fallback; } return r; }); 

一句工程经验:

CompletableFuture 的异常是“流的一部分”,不是打断流程。

六、超时控制(非常关键)

Java 9+ 推荐方式

future .orTimeout(2, TimeUnit.SECONDS) .exceptionally(e -> fallback); 

或者:

future .completeOnTimeout(fallback, 2, TimeUnit.SECONDS); 

比 Future.get(timeout) 的优势:

  • 不阻塞线程
  • 超时是异步语义的一部分

七、CompletableFuture + 线程池的最佳实践

✔ 1)明确线程池职责

  • IO 任务 → ioPool
  • CPU 任务 → cpuPool
  • 定时 → scheduledPool

CompletableFuture.supplyAsync(this::loadData, ioPool)

✔ 2)不要在 CompletableFuture 里 get()

// ❌ 反模式 future.thenApply(r -> anotherFuture.get()); 

✔ 3)异常必须收敛在链路末端

future .thenApply(...) .thenApply(...) .exceptionally(this::fallback); 

✔ 4)避免在 commonPool 跑重任务

ForkJoinPool 是共享资源,容易拖垮 JVM。

八、一个完整实战 Demo

补充知识点:

Java 的“高阶函数”到底是什么:Runnable / Callable 就是函数参数

CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(this::loadUser, ioPool); CompletableFuture<Order> orderFuture = CompletableFuture.supplyAsync(this::loadOrder, ioPool); CompletableFuture<UserInfo> result = userFuture .thenCombine(orderFuture, UserInfo::new) .orTimeout(2, TimeUnit.SECONDS) .exceptionally(e -> { log.error("timeout or error", e); return UserInfo.empty(); }); result.thenAccept(info -> render(info)); 

这个 Demo 覆盖了:

  • 并行
  • 合并
  • 超时
  • 异常
  • 自定义线程池

九、和前 9 篇的“闭环关系”

你现在拥有的是一套完整体系:

  • 线程池(1–5)
  • 任务提交与异常(6–7)
  • 可观测性(8)
  • 背压(9)
  • 异步编排(10) ← 收官

一句总结:

ThreadPoolExecutor 决定“系统能不能跑”,
CompletableFuture 决定“并发代码能不能写得优雅、可维护”。

十、全专栏终极总结

线程池是并发执行的基础设施背压决定系统是否稳定监控决定问题是否可见CompletableFuture 决定异步逻辑是否可维护并发不是“多开线程”,而是“正确组织任务关系”

到这里,已经是一个完整、工程级、的体系。

Read more

【无人机避障算法核心技术】:揭秘五种主流算法原理与实战应用场景

第一章:无人机避障算法概述 无人机避障算法是实现自主飞行的核心技术之一,其目标是在复杂环境中实时感知障碍物,并规划安全路径以避免碰撞。随着传感器技术和计算能力的提升,避障系统已从简单的距离检测发展为融合多源信息的智能决策体系。 避障系统的基本组成 典型的无人机避障系统包含以下关键模块: * 感知模块:利用激光雷达、超声波、立体视觉或RGB-D相机获取环境数据 * 数据处理模块:对原始传感器数据进行滤波、特征提取和障碍物识别 * 决策与规划模块:基于环境模型生成避障轨迹,常用算法包括A*、Dijkstra、RRT和动态窗口法(DWA) 常见避障算法对比 算法优点缺点适用场景A*路径最优,搜索效率高高维空间计算开销大静态环境全局规划DWA实时性强,适合动态避障局部最优风险室内低速飞行RRT*渐进最优,适应复杂空间收敛速度慢三维未知环境 基于深度学习的避障方法示例 近年来,端到端神经网络被用于直接从图像生成控制指令。以下是一个简化的行为克隆模型推理代码片段: import torch import torchvision.transforms as tran

By Ne0inhk
医疗连续体机器人模块化控制界面设计与Python库应用研究(下)

医疗连续体机器人模块化控制界面设计与Python库应用研究(下)

软件环境部署 系统软件架构以实时性与兼容性为核心设计目标,具体配置如下表所示: 类别配置详情操作系统Ubuntu 20.04 LTS,集成RT_PREEMPT实时内核补丁(调度延迟<1 ms)开发环境Python 3.8核心库组件PyQt5 5.15.4(图形界面)、OpenCV 4.5.5(图像处理)、NumPy 1.21.6(数值计算) 该环境支持模块化控制界面开发与传感器数据的实时融合处理,为连续体机器人的逆运动学求解(如FB CCD算法测试)提供稳定运行基础[16]。 手眼协调校准 为实现视觉引导的精确控制,需完成相机与机器人基坐标系的空间映射校准,具体流程如下: 1. 标识点布置:在机器人末端及各段首尾、中间位置共固定7个反光标识点,构建臂型跟踪特征集[29]; 2. 数据采集:采用NOKOV度量光学动作捕捉系统(8台相机,

By Ne0inhk

简单易学的分离式部署小米智能家居Miloco方法

一、安装环境 * Windows用户:安装WSL2以及Docker * macOS/Linux用户:安装Docker 此处不再赘述,网上随便找个教程即可。特别地,对于Windows用户来说,你需要将 WSL2 的网络模式设置为 Mirrored。 二、使用Docker部署Miloco后端 以下均为bash命令。请Windows用户进入WSL2 / Linux、macOS用户进入终端操作: mkdir miloco cd milico vi docker-compose.yml 以下是compose的内容(不会使用vi的同学可以傻瓜式操作:先按i,再使用粘贴功能,然后按冒号,输入wq然后回车,记得关闭输入法): services:backend:container_name: miloco-backend image: ghcr.nju.edu.cn/xiaomi/miloco-backend:latest network_mode:

By Ne0inhk

EtherCAT在机器人多轴控制中的实战应用与性能优化

EtherCAT在工业机器人多轴控制中的实战优化与性能突破 工业机器人正经历从单轴独立控制向多轴协同作业的演进,而EtherCAT凭借其微秒级同步精度和灵活的拓扑结构,已成为高端装备制造领域的通信标准。在半导体晶圆搬运、包装机械高速分拣等场景中,传统脉冲控制方案正被基于EtherCAT的分布式时钟体系所替代。本文将深入解析如何通过协议优化、硬件选型和网络设计,实现128轴以上系统的抖动控制在±100ns以内。 1. EtherCAT核心技术解析与性能优势 EtherCAT的革新性在于其"On-the-fly"数据处理机制。与常规工业以太网不同,EtherCAT从站设备采用专用ASIC芯片(如ET1100、ET1200)进行帧处理,数据延迟仅纳秒级。在汽车焊接机器人案例中,采用分布式时钟同步的6轴系统可实现循环周期250μs,位置控制精度达±1μm。 关键性能指标对比: 参数脉冲控制CANopenEtherCAT同步精度±1ms±500μs±100ns单周期最大轴数8轴32轴256轴拓扑灵活性星型总线型任意拓扑电缆最大长度20m100m100m(铜缆)/2km(光纤)单帧

By Ne0inhk