一、Why Parallel Computing?
Advance performance to a new level previously out of reach 将性能提升到过去无法达到的水平。 核心思想: 随着单核 CPU 频率提升变慢(Dennard scaling 失效), 性能提升的主要方式已经从'提高频率'转向'增加并行度'。 也就是说:
介绍基于现代 C++ 的任务并行编程系统 Taskflow。首先分析并行计算背景,对比 CPU 与 GPU 性能差异及 Amdahl 定律限制。随后详细讲解 Taskflow 的核心概念,包括静态与动态任务图、控制流任务图(CTFG)以及工作窃取(Work-stealing)运行时机制。文中通过多个 C++ 代码示例展示了如何使用 Taskflow 实现并行归约、排序、流水线及异步任务调度,并探讨了其在异构架构下的可扩展性优势。
Advance performance to a new level previously out of reach 将性能提升到过去无法达到的水平。 核心思想: 随着单核 CPU 频率提升变慢(Dennard scaling 失效), 性能提升的主要方式已经从'提高频率'转向'增加并行度'。 也就是说:
图中表示:
Time (minutes) to speed up a machine learning algorithm 横轴:
纵轴:
趋势:
典型 CPU:
典型 GPU:
例如: CPU 并行度 ≈ 32 GPU 并行度 ≈ 5000
在高度可并行任务中(如矩阵乘法): 理论加速比 ≈ 5000 / 32 ≈ 156 当然实际不会达到理论值,但 10–100x 是常见的。
并行加速并不是无限的。 设:
Amdahl 定律: Speedup(N) = 1 / ((1-P) + P/N)
假设: P = 0.95 (95% 可并行) 若使用 1000 个 GPU 核: Speedup = 1 / (0.05 + 0.95/1000) = 1 / (0.05 + 0.00095) = 1 / 0.05095 ≈ 19.6
即使有 1000 核,最多也只能约 20 倍加速。 这说明:
串行部分是性能瓶颈
机器学习核心是:
这些都属于:
数据并行(Data Parallelism)
例如矩阵乘法: C_ij = ∑_k A_ik * B_kj 每个 C_ij 都可以独立计算: 这意味着: O(n^2) 个元素可以并行 这正是 GPU 擅长的结构。
1⃣ CPU 核心少 2⃣ 同步开销大 3⃣ 内存带宽限制 4⃣ NUMA 访问延迟 5⃣ 上下文切换开销
而 GPU:
理想线性加速: T(N) = T(1) / N
现实情况: T(N) = T(1) / (N * η) 其中: 0 < η < 1 η 是效率因子。 当通信开销大时: η → 0.5 或更低
#include <vector>
#include <omp.h>
#include <iostream>
// 模拟一个可并行计算任务
int main() {
const int N = 100000000;
std::vector<double> data(N, 1.0);
double sum = 0.0;
// 并行循环
#pragma omp parallel for reduction(+:sum)
for (int i = 0; i < N; ++i) {
sum += data[i];
}
std::cout << "Sum = " << sum << std::endl;
}
#pragma omp parallel for
表示:
如果使用 8 核: 理论时间: T ≈ T_single / 8
并行计算的目标是:
很多问题的时间复杂度是: O(N^2) 或 O(N^3) 如果不并行,计算时间会爆炸。
并行计算的重要性在于:
Modern parallel workloads are very complex and irregular 现代并行负载非常复杂且不规则
现实应用已经不再是:
而是:
这些计算具有:
这就需要:
任务并行模型
现代系统包括:
称为:
Heterogeneous Architecture(异构架构)
例如: 电路仿真算法可能包含:
典型形式: for i = 1...N: y_i = f(x_i) 每个元素独立。 适用于:
把算法分解成任务: T = {t_1, t_2, ..., t_n} 并定义依赖关系: t_i → t_j 表示: t_j 必须在 t_i 完成后执行 形成:
有向无环图(DAG)
设任务图为: G = (V, E) 其中:
W = ∑_{i=1}^{n} w_i 每个任务的计算量之和。
S = max_path ∑ w_i 最长依赖路径。
根据 Brent 定理: T_p ≥ max(W/P, S) 其中: P = 处理器数量
说明:
性能受限于关键路径
未来架构趋势:
数据并行适合:
任务并行适合:
因此:
Task parallelism is most scalable for future heterogeneous architectures
Capture your intention in decomposing an algorithm into a top-down task graph 意思是: 从算法逻辑结构出发,构建任务图。
例如:
形成: t_1 → t_2 → t_3 → t_4 → t_5 但中间可能有分叉: t_2 → {t_3, t_4}
下面列出常见任务并行框架(简要说明)
支持任务构造(task directive)
面向性能可移植性
Intel Threading Building Blocks
异构任务调度
Python 分布式任务图
分布式任务图调度
分布式 AI 任务系统
#include <iostream>
#include <omp.h>
// 模拟三个有依赖的任务
int main() {
#pragma omp parallel {
#pragma omp single {
// 任务 A
#pragma omp task {
std::cout << "Task A running\n";
}
// 任务 B
#pragma omp task {
std::cout << "Task B running\n";
}
// 等待 A 和 B 完成
#pragma omp taskwait
// 任务 C 依赖 A 和 B
#pragma omp task {
std::cout << "Task C running after A and B\n";
}
}
}
return 0;
}
#pragma omp task
表示创建一个独立任务。
#pragma omp taskwait
表示: 必须等待之前任务完成。 这构成一个简单 DAG:
A
/ \
B C
\ /
C
数学表达: A → C B → C
假设:
时间模型: T = T_cpu1 + T_gpu + T_cpu2 如果 GPU 并行度高: T_gpu ≪ T_cpu1 总时间显著下降。
| 特性 | 数据并行 | 任务并行 |
|---|---|---|
| 结构 | 规则 | 不规则 |
| 依赖 | 很少 | 明确依赖 |
| 扩展性 | 好 | 更好 |
| 异构适配 | 一般 | 极好 |
| 表达能力 | 低 | 高 |
未来趋势: cores → 10^3 ~ 10^6 而: 规则问题越来越少 复杂系统越来越多 只有任务图可以表达:
任务并行的核心优势:
关键数学模型: G = (V, E) T_p ≥ max(W/P, S)
Taskflow 是一个:
核心思想:
用任务图表达算法结构,而不是写线程代码。
静态任务图的意思是:
所有任务和依赖关系在运行前就构建完成。
数学模型: 定义任务图: G = (V, E) 其中:
执行时间满足: T_p ≥ max(W/P, S) 其中:
#include <iostream>
#include <taskflow/taskflow.hpp>
// 引入 Taskflow 头文件(header-only 库)
int main() {
// 执行器:负责调度线程池执行任务
tf::Executor executor;
// 任务图对象:用来构建 DAG
tf::Taskflow taskflow;
// 创建四个任务(lambda 表达式)
// emplace 会返回 task 句柄
auto [A, B, C, D] = taskflow.emplace(
// 任务 A
[](){ std::cout << "TaskA\n"; },
// 任务 B
[](){ std::cout << "TaskB\n"; },
// 任务 C
[](){ std::cout << "TaskC\n"; },
// 任务 D
[](){ std::cout << "TaskD\n"; });
// 定义依赖关系:
// A 必须在 B 和 C 之前执行
A.precede(B, C);
// D 必须在 B 和 C 之后执行
D.succeed(B, C);
// 提交任务图到执行器,并等待执行完成
executor.run(taskflow).wait();
return 0;
}
根据依赖:
A.precede(B, C); D.succeed(B, C);
等价关系: A → B A → C B → D C → D
A
/ \
B C
\ /
D
合法执行顺序:
并行度分析: 若每个任务耗时为 t: 总工作量: W = 4t 关键路径: S = 3t (A → B → D 或 A → C → D)
理论最优时间: T_p ≥ max(4t/P, 3t) 如果 P ≥ 2: T_p = 3t
因为:
对比: 动态任务图(Dynamic Task Graph):
可以表达任意 DAG。
内部使用:
Work-stealing 调度算法
线程空闲时:
OpenMP:
#pragma omp task
缺点:
Taskflow:
Executor 内部维护: P 个 worker threads
执行规则:
数学表达: 若任务 v 的入度: deg^-(v) = 0 则可调度。
对于 DAG: 总时间近似: T ≈ S + (W - S)/P 当: P → ∞ 有: T → S
说明:
关键路径是性能上限
auto [A,B,C,D] = taskflow.emplace(
[](){ std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::cout<<"A\n";},
[](){ std::this_thread::sleep_for(std::chrono::milliseconds(200)); std::cout<<"B\n";},
[](){ std::this_thread::sleep_for(std::chrono::milliseconds(200)); std::cout<<"C\n";},
[](){ std::cout<<"D\n";});
你会看到:
Taskflow 核心思想:
和静态任务图不同:
也就是说:
DAG 在运行时逐步生成,而不是提前构建完成。
设任务集合为: V = {t_1, t_2, ...} 依赖关系为: E = {(t_i → t_j)}
在动态模型中: V(t) 和 E(t) 会随着时间变化。 即: G(t) = (V(t), E(t))
#include <iostream>
#include <taskflow/taskflow.hpp>
// Taskflow 是 header-only
int main() {
// 创建执行器(内部是线程池)
tf::Executor executor;
// 创建任务 A(无依赖)
auto A = executor.silent_dependent_async([](){ std::cout << "TaskA\n";});
// 创建任务 B(依赖 A)
auto B = executor.silent_dependent_async([](){ std::cout << "TaskB\n";}, A);
// 创建任务 C(依赖 A)
auto C = executor.silent_dependent_async([](){ std::cout << "TaskC\n";}, A);
// 创建任务 D(依赖 B 和 C)
auto D = executor.silent_dependent_async([](){ std::cout << "TaskD\n";}, B, C);
// 等待所有任务完成
executor.wait_for_all();
return 0;
}
依赖关系: A → B A → C B → D C → D
图结构:
A
/ \
B C
\ /
D
| 静态 Taskflow | 动态 async |
|---|---|
| taskflow.emplace | silent_dependent_async |
| 先构建 DAG | 边创建边执行 |
| run(taskflow) | 直接提交到 executor |
| 适合结构固定 | 适合运行时动态生成 |
设:
总工作量: W = 4t 关键路径: S = 3t
理论执行时间: T_p ≥ max(W/P, S)
若线程数 P ≥ 2: T_p = 3t
函数原型逻辑:
silent_dependent_async(task, dependencies...)
意思:
内部机制: 每个任务维护: deg^-(v) 当: deg^-(v) = 0 任务进入 ready queue。
因为你可以:
例如:
auto A = executor.silent_dependent_async([&executor](){ std::cout<<"TaskA\n";
// 在 A 运行期间创建新任务
executor.silent_dependent_async([](){ std::cout<<"Dynamic task\n";});
});
这时: V(t) 在运行时增加
Executor 内部:
数学上: 若线程数为 P: 期望时间近似: T_p ≈ W/P + O(S)
Work-stealing 具有:
适合:
例如: 分治算法: T(n) = 2T(n/2) + O(n) 可以在运行中不断创建子任务。
std::async:
Taskflow:
动态任务图的本质: G(t) = (V(t), E(t)) 随着运行动态扩展。
优势:
CTFG = 控制流任务图 它的核心思想是:
不仅表达'计算任务依赖',还表达'控制流逻辑'(if / while / loop / branch) 普通 DAG 只能表达: A → B 但 CTFG 可以表达:
即:
把程序控制结构也变成任务图的一部分
原始结构类似:
auto [init, cond, yes, no] = taskflow.emplace(...);
我们写成一个完整、正确、带注释的版本:
#include <iostream>
#include <taskflow/taskflow.hpp>
int main() {
tf::Executor executor;
tf::Taskflow taskflow;
// 创建 4 个任务
auto [init, cond, yes, no] = taskflow.emplace(
// 初始化任务
[](){ std::cout << "initialize\n"; },
// 条件任务(Condition Task)
// 返回值决定走哪条分支
[]()->int{ std::cout << "checking condition\n"; return 0; /* 返回 0 走 yes,返回 1 走 no */ },
// yes 分支
[](){ std::cout << "yes branch\n"; },
// no 分支
[](){ std::cout << "no branch\n"; });
// 依赖关系定义
// cond 必须在 init 后执行
cond.succeed(init);
// cond 决定执行 yes 或 no
cond.precede(yes, no);
executor.run(taskflow).wait();
return 0;
}
定义: t_cond : N → {0,1,...,k} 若: t_cond() = i 则调度: t_i 这是一个动态边选择机制。
第二段代码表示:
迭代优化直到收敛
等价伪代码:
initialize();
while(!converged()){
optimize();
}
print("done");
#include <iostream>
#include <taskflow/taskflow.hpp>
bool converged(){
static int count = 0;
return ++count > 3; // 模拟 3 次后收敛
}
int main() {
tf::Executor executor;
tf::Taskflow taskflow;
auto [init, opt, cond, stop] = taskflow.emplace(
// 初始化数据结构
[](){ std::cout << "initialize data\n"; },
// 优化步骤
[](){ std::cout << "optimize step\n"; },
// 条件任务(返回分支编号)
[]()->int{return converged()?1:0; },
// 收敛后停止
[](){ std::cout << "done!\n"; });
// 执行顺序:opt.succeed(init).precede(cond)
// cond:
// 返回 0 → 回到 opt
// 返回 1 → 进入 stop
cond.precede(opt, stop);
executor.run(taskflow).wait();
return 0;
}
图结构:
init
|
v
opt
|
v
cond
/ \
opt stop
设:
总时间: T = k * t
任务图表示: opt_i → cond_i → opt_{i+1} 直到: cond_i = 1
普通任务图:
CTFG:
本质: Program → Control DAG
因为: 整个算法流程可以完全用任务图表达:
没有隐藏的串行控制逻辑。 这意味着: 整个程序 = G(V,E) 调度器可以:
设:
理论执行时间: T_p ≥ max(W/P, S)
CTFG 优势: 减少人为串行代码,缩短 S。
传统:
while(!converged()){
optimize();
}
问题:
CTFG:
| 特性 | 数据并行 | CTFG |
|---|---|---|
| 表达能力 | 低 | 高 |
| 控制流 | 无 | 有 |
| 适合复杂算法 | 否 | 是 |
| 异构友好 | 一般 | 极好 |
CTFG 是:
把程序的控制流结构变成可调度的任务图
它可以表达:
数学本质: Program = G(V,E) 执行时间受: critical path 控制。
Work-stealing(工作窃取)是一种并行调度算法:
空闲线程主动从其他线程'窃取'任务执行。 其目标是:
我们按照流程图逐步解释。
Start → CheckEmpty
每个 worker 线程执行:
while(true){
if(queue.empty()){ wait_or_steal(); }
else{ execute_task(); }
}
CheckEmpty -- Y --> Wait
线程会:
CheckEmpty -- N --> Dequeue
从本地双端队列取任务:
t = queue.pop_front();
判断:
IsCondition{Condition task?}
在 CTFG 中,任务可能是:
流程:
r = invoke(t)
enqueue r-th child
代码逻辑类似:
int r = t(); // 执行条件函数
enqueue(t.child(r));
数学表达: 设: t_cond : N → {0,1,...,k} 若: t_cond() = r 则只激活: child_r 这实现了:
流程:
invoke(t)
↓
DecDeps
↓
EnqueueSuccessors
对应逻辑:
invoke(t);
for(auto s : successors(t)){
s.remaining_deps--;
if(s.remaining_deps == 0) enqueue(s);
}
每个任务维护: deg^-(v) 即:
剩余强依赖数量
当: deg^-(v) = 0 任务进入 ready queue。
设:
根据理论: T_P ≤ W/P + O(S)
Work-stealing 被证明:
期望时间接近理论最优。
使用:
集中式调度:
Work-stealing:
复杂度对比: 集中式: O(P) 窃取式: O(1) 平均
CTFG 中:
Work-stealing 具有:
强平衡能力(Strong Balance)
即: Active workers ≈ Available parallelism 保持:
任务一旦依赖满足: deg^-(v) = 0 立即入队。
单位时间执行任务数量: Throughput = Tasks completed / Time
Work-stealing 减少 idle time。
void worker_loop(){
while(true){
Task* t = pop_local();
if(!t){
t = steal_from_others();
if(!t){ sleep(); continue; }
}
if(t->is_condition()){
int r = t->invoke();
enqueue(t->child(r));
} else {
t->invoke();
for(auto s : t->successors){
if(--s->remaining_deps == 0) enqueue(s);
}
}
}
}
任务生成速度: λ_g 任务执行速度: λ_e
理想情况: λ_g ≈ λ_e
Work-stealing 能自动调节:
即使 Work-stealing 非常优秀: 总时间仍受限于: S = critical path 当: P → ∞ 有: T → S
Taskflow 的 Work-stealing Runtime:
数学核心: T_P ≤ W/P + O(S)
工程效果:
#include <iostream>
#include <taskflow/taskflow.hpp>
// Taskflow 是 header-only 库
int main() {
// 创建执行器(内部维护线程池)
tf::Executor executor;
// 创建主任务图
tf::Taskflow taskflow;
// 创建主图中的任务 A
tf::Task A = taskflow.emplace([](){ std::cout << "Task A\n"; }).name("A");
// 创建主图中的任务 C
tf::Task C = taskflow.emplace([](){ std::cout << "Task C\n"; }).name("C");
// 创建主图中的任务 D
tf::Task D = taskflow.emplace([](){ std::cout << "Task D\n"; }).name("D");
// 创建任务 B(包含一个子任务图 Subflow)
tf::Task B = taskflow.emplace([](tf::Subflow& subflow){
std::cout << "Task B (start)\n";
// 在 B 内部创建子任务 B1
tf::Task B1 = subflow.emplace([](){ std::cout << "Task B1\n"; }).name("B1");
// 子任务 B2
tf::Task B2 = subflow.emplace([](){ std::cout << "Task B2\n"; }).name("B2");
// 子任务 B3
tf::Task B3 = subflow.emplace([](){ std::cout << "Task B3\n"; }).name("B3");
// 依赖关系:
// B3 必须在 B1 和 B2 执行完成后执行
B3.succeed(B1, B2);
std::cout << "Task B (end)\n";
}).name("B");
// 主任务图依赖关系:
// A 必须在 B 和 C 之前执行
A.precede(B, C);
// D 必须在 B 和 C 之后执行
D.succeed(B, C);
// 执行任务图并等待完成
executor.run(taskflow).wait();
return 0;
}
依赖关系: A → B A → C B → D C → D
图结构:
A
/ \
B C
\ /
D
B1 B2
\ /
B3
数学表示: B1 → B3 B2 → B3
可能的执行顺序:
设:
总工作量: W = 7t 关键路径(最长路径): A → B → B3 → D 长度: S = 4t
理论并行时间: T_P ≥ max(7t/P, 4t)
当线程数足够多: T ≈ 4t
默认行为:
如果调用:
subflow.detach();
则:
这个例子展示了:
数学模型仍为: T_P ≤ W/P + O(S) 其中:
B 子任务图
任务 A
任务 B
任务 C
任务 D
任务 B1
任务 B3
任务 B2
graph TD %% 主任务图
A["任务 A"] --> B["任务 B"]
A --> C["任务 C"]
B --> D["任务 D"]
C --> D
%% 子任务图 B 内部
subgraph B 子任务图
B1["任务 B1"] --> B3["任务 B3"]
B2["任务 B2"] --> B3
B3 --> B
end
%% 样式美化
style A fill:#ffcc00,stroke:#333,stroke-width:2px
style B fill:#66ccff,stroke:#333,stroke-width:2px
style C fill:#ff99cc,stroke:#333,stroke-width:2px
style D fill:#99ff99,stroke:#333,stroke-width:2px
style B1 fill:#ccff99,stroke:#333
style B2 fill:#ccff99,stroke:#333
style B3 fill:#ccff99,stroke:#333
#include <taskflow/taskflow.hpp>
// Taskflow 是头文件库
#include <iostream>
#include <cstdlib> // std::rand()
#include <ctime> // std::time()
int main() {
// 初始化随机数种子
std::srand(static_cast<unsigned>(std::time(nullptr)));
// 创建 Taskflow 执行器和任务流
tf::Executor executor;
tf::Taskflow taskflow;
// -------------------------
// Step 1: 初始化任务
// -------------------------
tf::Task init = taskflow.emplace([](){ std::cout << "初始化任务 init\n"; }).name("init");
// -------------------------
// Step 2: 停止任务
// -------------------------
tf::Task stop = taskflow.emplace([](){ std::cout << "结束任务 stop\n"; }).name("stop");
// -------------------------
// Step 3: 条件任务 (Condition Task)
// 返回随机 0 或 1,用于控制流程
// 0 -> 反馈回 cond(循环)
// 1 -> 结束 stop
// -------------------------
tf::Task cond = taskflow.emplace([](){
int r = std::rand()%2;
std::cout << "条件任务 cond 返回:" << r << "\n";
return r;
}).name("cond");
// -------------------------
// Step 4: 构建任务依赖关系
// -------------------------
init.precede(cond); // init 任务在 cond 前执行
// cond 返回 0 或 1 决定下一步:
// 0 -> 再次执行 cond(形成循环)
// 1 -> 执行 stop
cond.precede(cond, stop);
// -------------------------
// Step 5: 运行任务流
// -------------------------
executor.run(taskflow).wait();
return 0;
}
cond 之前执行cond(形成循环)stop,结束任务流init.precede(cond):init 在 cond 前执行cond.precede(cond, stop):cond 根据返回值决定下一步返回 0
返回 1
初始化任务 init
条件任务 cond?
再次执行 cond
结束任务 stop
#include <taskflow/taskflow.hpp>
// Taskflow header-only
#include <iostream>
int main() {
// ---------------------------
// 创建子任务流 f1
// ---------------------------
tf::Taskflow f1;
tf::Task f1A = f1.emplace([](){ std::cout << "Task f1A\n"; }).name("f1A");
tf::Task f1B = f1.emplace([](){ std::cout << "Task f1B\n"; }).name("f1B");
// f1A 先于 f1B
f1A.precede(f1B);
// ---------------------------
// 创建主任务流 f2
// ---------------------------
tf::Taskflow f2;
// 普通任务
tf::Task f2A = f2.emplace([](){ std::cout << "Task f2A\n"; }).name("f2A");
tf::Task f2B = f2.emplace([](){ std::cout << "Task f2B\n"; }).name("f2B");
tf::Task f2C = f2.emplace([](){ std::cout << "Task f2C\n"; }).name("f2C");
// 模块任务,将子任务流 f1 嵌入 f2
tf::Task f1_module_task = f2.composed_of(f1).name("module");
// 模块任务与其他任务的依赖关系
f1_module_task.succeed(f2A, f2B); // module 之后执行 f2A 和 f2B
f1_module_task.precede(f2C); // module 之前执行 f2C
// ---------------------------
// 创建执行器并运行 f2
// ---------------------------
tf::Executor executor;
executor.run(f2).wait();
return 0;
}
f1
f1A 和 f1B,并且 f1A 先于 f1B 执行。f2
f2A, f2B, f2C。composed_of(f1) 将子任务流 f1 嵌入为模块任务 module。f1_module_task.succeed(f2A, f2B) → 模块执行后再执行 f2A 和 f2B。f1_module_task.precede(f2C) → 模块任务必须在 f2C 执行前完成。tf::Executor
f2。f1A
f1B
module (f1)
f2A
f2B
f2C
#include <taskflow/taskflow.hpp>
// Taskflow header-only
#include <iostream>
#include <future>
int main() {
// 创建一个 Executor 对象,用于运行异步任务
tf::Executor executor;
// -------------------------------
// 1. 创建普通异步任务,返回值类型为 int
// executor.async 会返回 std::future<int>
std::future<int> future = executor.async([](){
std::cout << "async task returns 1\n";
return 1; // 返回值
});
// -------------------------------
// 2. 创建普通异步任务,不返回值
executor.silent_async([](){
std::cout << "async task does not return\n";
});
// -------------------------------
// 3. 创建带依赖关系的异步任务(动态任务图)
// silent_dependent_async 可以让任务依赖于其他任务
tf::AsyncTask A = executor.silent_dependent_async([](){ std::cout << "Task A\n"; });
tf::AsyncTask B = executor.silent_dependent_async([](){ std::cout << "Task B\n"; }, A); // B 在 A 完成后执行
tf::AsyncTask C = executor.silent_dependent_async([](){ std::cout << "Task C\n"; }, A); // C 在 A 完成后执行
tf::AsyncTask D = executor.silent_dependent_async([](){ std::cout << "Task D\n"; }, B, C); // D 在 B 和 C 完成后执行
// -------------------------------
// 等待所有异步任务完成
executor.wait_for_all();
// -------------------------------
// 获取 future 的返回值
int result = future.get();
std::cout << "Future result: " << result << "\n";
return 0;
}
tf::Executor 是 Taskflow 的运行时,管理线程池与任务调度。executor.async 返回 std::future,可以获取返回值。executor.silent_async 不返回值,也不阻塞。silent_dependent_async 可以让任务依赖其他任务完成后执行。D 依赖于 B 和 C,只有当 B 和 C 都完成后,D 才会运行。executor.wait_for_all() 阻塞当前线程直到所有异步任务完成。future.get() 获取执行结果。async task returns 1 (future)
async task does not return
Task A
Task B
Task C
Task D
#include <taskflow/taskflow.hpp>
// Taskflow header-only
#include <iostream>
int main() {
tf::Executor executor;
tf::Taskflow taskflow;
// 创建三个简单任务
auto A = taskflow.emplace([](){ std::cout << "Task A\n"; }).name("A");
auto B = taskflow.emplace([](){ std::cout << "Task B\n"; }).name("B");
auto C = taskflow.emplace([](){ std::cout << "Task C\n"; }).name("C");
// 设置依赖关系
A.precede(B, C); // A 先于 B 和 C 执行
// 1⃣ 运行一次 taskflow
tf::Future<void> run_once = executor.run(taskflow);
run_once.get(); // 等待完成
// 2⃣ 运行 4 次
executor.run_n(taskflow, 4);
// 3⃣ 运行直到计数器为 0
executor.run_until(taskflow, [counter=5]() mutable{return --counter == 0;});
// 阻塞等待所有提交的 taskflow 完成
executor.wait_for_all();
return 0;
}
tf::Executor executor;
tf::Taskflow taskflow;
taskflow.emplace(...)
.name("A") 给任务命名。A.precede(B, C)
executor.run(taskflow)
tf::Future<void> 可用于查询状态。executor.run_n(taskflow, 4)
executor.run_until(taskflow, [counter=5]{...})
executor.wait_for_all()
Task A
Task B
Task C
run_once, run_n, run_until),DAG 表示任务间的依赖关系,而非执行次数。#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/reduce.hpp>
#include <taskflow/algorithm/sort.hpp>
#include <taskflow/algorithm/for_each.hpp>
#include <vector>
#include <iostream>
int main() {
tf::Executor executor;
tf::Taskflow taskflow;
std::vector<int> data(10, 0); // 创建长度为 10 的数组,初始化为 0
int init_sum = 0;
// 1⃣ 并行赋值:将每个元素设置为 100
tf::Task task1 = taskflow.for_each(
data.begin(), data.end(),
[](auto& i){ i = 100; }
).name("Parallel ForEach");
// 2⃣ 并行归约:对所有元素求和
tf::Task task2 = taskflow.reduce(
data.begin(), data.end(), init_sum,
[](auto a, auto b){ return a + b; }
).name("Parallel Reduce");
// 3⃣ 并行排序:对数组排序
tf::Task task3 = taskflow.sort(
data.begin(), data.end(),
[](auto a, auto b){ return a < b; }
).name("Parallel Sort");
// 设置依赖关系
task1.precede(task2); // 先并行赋值,再归约
task2.precede(task3); // 再并行排序
// 执行任务流
executor.run(taskflow).wait();
// 输出最终数组
std::cout << "Sorted data: ";
for(auto v : data) std::cout << v << " ";
std::cout << std::endl;
return 0;
}
data,将每个元素赋值为 100。data,计算总和 init_sum。data。Parallel ForEach
Parallel Reduce
Parallel Sort
#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>
#include <vector>
#include <cstdio>
int main() {
tf::Executor executor;
tf::Taskflow taskflow;
const size_t num_parallel_lines = 5; // 并行 pipeline 的 token 数量
std::vector<int> buffer(num_parallel_lines, 0);
// 创建 pipeline
tf::Pipeline pl(
num_parallel_lines,
// Stage 1: 初始化 token
tf::Pipe{
tf::PipeType::SERIAL,
[&buffer](tf::Pipeflow& pf){
buffer[pf.line()] = static_cast<int>(pf.token());
printf("stage 1: token %zu stored in buffer[%zu]\n", pf.token(), pf.line());
if(pf.token() == 5){
pf.stop(); // 当 token = 5 时停止 pipeline
}
}
},
// Stage 2: 处理 token
tf::Pipe{
tf::PipeType::SERIAL,
[&buffer](tf::Pipeflow& pf){
printf("stage 2: input buffer[%zu] = %d\n", pf.line(), buffer[pf.line()]);
buffer[pf.line()] += 10; // 模拟处理
}
},
// Stage 3: 输出 token
tf::Pipe{
tf::PipeType::SERIAL,
[&buffer](tf::Pipeflow& pf){
printf("stage 3: input buffer[%zu] = %d\n", pf.line(), buffer[pf.line()]);
}
}
);
// 将 pipeline 添加到任务流
taskflow.composed_of(pl);
// 执行任务流
executor.run(taskflow).wait();
return 0;
}
num_parallel_lines 个 token 并行流。Stage 1: 初始化 token
Stage 2: 处理 token
Stage 3: 输出 token

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online