高性能 C++ 单机任务调度器设计与实现
一个基于 C++20 实现的单机任务调度器,涵盖任务生命周期管理、资源控制及进程隔离。支持优先级调度、超时控制、cgroup 隔离、PSI 背压监测及 SQLite 持久化。提供 Prometheus 指标导出与 HTTP 健康检查接口。适用于 AI 训练、CI/CD、Serverless 及边缘计算等场景,强调低延迟与高可靠性。

一个基于 C++20 实现的单机任务调度器,涵盖任务生命周期管理、资源控制及进程隔离。支持优先级调度、超时控制、cgroup 隔离、PSI 背压监测及 SQLite 持久化。提供 Prometheus 指标导出与 HTTP 健康检查接口。适用于 AI 训练、CI/CD、Serverless 及边缘计算等场景,强调低延迟与高可靠性。

TaskScheduler 是一个 C++20 实现的单机任务调度器,用于管理和执行本地任务。它提供了完整的任务生命周期管理能力,包括任务提交、资源管理、进程执行、超时控制和状态跟踪。
1. AI/机器学习训练与推理(最热门场景)
场景:一台服务器上有多个 GPU,需要同时运行多个训练或推理任务。
需求:
例子:
这类系统往往用 C++ 写调度器 + Python 做用户接口。
2. CI/CD 与自动化测试平台
场景:GitLab Runner、Jenkins Agent、内部构建系统。
需求:
为什么不用 Docker?
腾讯、阿里、美团都有自研的 高性能 CI 执行引擎,底层就是单机调度器。
3. Serverless / FaaS(函数即服务)的本地运行时
场景:用户提交一个函数(如 Python lambda),平台在本地执行。
需求:
例子:
技术栈:C++ 调度器 + gVisor/firecracker(可选) + 快照恢复
关键指标:冷启动延迟 < 50ms,这正是 C++ 单机调度器的优势。
4. 边缘计算(Edge Computing)
场景:摄像头、IoT 网关、车载设备等资源受限设备。
需求:
例子:
华为、百度 Apollo、大疆等公司在边缘端大量使用 C++ 调度框架。
5. 游戏服务器(Game Server)
场景:一个物理机部署多个游戏房间(Room)实例。
需求:
腾讯天美、米哈游、网易雷火都有类似架构。
| 公司 | 应用场景 | 是否招人 |
|---|---|---|
| 华为 | AI 训练调度、芯片任务分发 | 大量 C++ 基础软件岗 |
| 阿里 | 函数计算 Worker、ODPS 本地执行器 | 云智能事业群常招 |
| 腾讯 | 游戏服务器调度、TEG 自动化测试平台 | TEG 后台开发(C++) |
| 字节 | 推荐模型训练、A/B 实验平台 | Infra 部门偏好系统人才 |
| 美团/快手 | CI/CD 执行引擎、离线批处理 | 基础架构部有相关需求 |
招聘关键词搜索建议:
struct JobSpec {
std::string cmd; // 要执行的命令字符串
int cpu_cores{1}; // 需要的 CPU 核数
size_t memory_mb{256}; // 需要的内存 MB
int timeout_sec{0}; // 超时秒数,0 表示不限制
int priority{0}; // 优先级,数值越大优先级越高
};
struct ResourceQuota {
int total_cpu{4}; // 可用 CPU 总核数
size_t total_mem_mb{2048}; // 可用内存总量 MB
};
enum class JobStatus {
Pending, // 已提交但尚未调度
Running, // 正在运行
Succeeded, // 成功结束(exit 0)
Failed, // 失败结束(非零退出码)
Timeout, // 超时被终止
Cancelled // 被取消
};


线程职责:
提交路径:
submit() → 校验白/黑名单 → 检查队列上限 → pending_.push_back() → inc_submitted() → cv_.notify_all()
调度路径:
dispatcher_loop() → pick_next_job() → rm_.reserve() → launch_job() → fork/exec → running_[id] = job
回收路径:
reaper_loop() → waitpid(WNOHANG) → 更新 exit_code/status → rm_.release() → cleanup_cgroup() → running_.erase(id)
职责:编排整个调度流程,管理任务生命周期
关键接口:
class Scheduler {
public:
explicit Scheduler(SchedulerOptions opts);
// 提交任务,返回 job id 或 -1
int submit(const JobSpec& spec);
// 启动后台线程
void start();
// 停止调度器并等待线程退出
void stop();
// 判断是否空闲(无待处理和运行中任务)
bool idle() const;
// 获取指标快照
Metrics::Snapshot metrics() const;
private:
void dispatcher_loop(); // 调度循环
void reaper_loop(); // 回收循环
void psi_loop(); // PSI 监测循环
void cron_loop(); // Cron 触发循环
bool launch_job(Job& job); // 启动任务进程
bool pick_next_job(Job& out); // 出队任务
void restore_from_store(); // 持久化恢复
};
关键数据结构:
private:
SchedulerOptions opts_; // 配置
ResourceManager rm_; // 资源管理器
std::vector<Job> pending_; // 待调度队列
std::unordered_map<int, Job> running_; // 运行中任务表
mutable std::mutex mu_; // 互斥锁
std::condition_variable cv_; // 条件变量
std::atomic<bool> shutting_down_{false}; // 关闭标志
std::atomic<bool> psi_backpressure_{false}; // 背压标志
Metrics metrics_; // 指标收集器
std::unique_ptr<JobStore> store_; // 持久化存储
std::unique_ptr<CronScheduler> cron_sched_; // Cron 调度器
std::unique_ptr<MetricsHttpServer> metrics_server_; // HTTP 服务
职责:管理 CPU 和内存配额,提供预留/释放接口
实现要点:
class ResourceManager {
public:
explicit ResourceManager(ResourceQuota quota);
// 尝试预留资源,成功返回 true
bool reserve(int cpu, size_t mem_mb);
// 释放资源(必须与 reserve 配对调用)
void release(int cpu, size_t mem_mb);
// 查询当前使用情况
std::pair<int, size_t> used() const;
private:
ResourceQuota quota_;
int used_cpu_{0};
size_t used_mem_mb_{0};
mutable std::mutex mu_;
};
核心逻辑(src/resource_manager.cpp):
bool ResourceManager::reserve(int cpu, size_t mem_mb) {
std::lock_guard lk(mu_);
// 不做部分分配:要么全部满足要么拒绝
if (used_cpu_ + cpu > quota_.total_cpu || used_mem_mb_ + mem_mb > quota_.total_mem_mb) {
return false;
}
used_cpu_ += cpu;
used_mem_mb_ += mem_mb;
return true;
}
职责:提供原子计数器,生成快照和 Prometheus 文本
指标项:
| 指标名 | 类型 | 说明 |
|---|---|---|
| submitted_ | counter | 累计提交任务数 |
| rejected_ | counter | 因队列满/策略拒绝的次数 |
| running_ | gauge | 当前运行中任务数 |
| succeeded_ | counter | 成功完成任务数 |
| failed_ | counter | 失败任务数 |
| timeout_ | counter | 超时被终止任务数 |
| launch_failed_ | counter | 启动失败次数 |
| pressure_blocked_ | counter | 因背压暂停的累计次数 |
| pressure_active_ | gauge | 背压是否激活(1/0) |
| queue_wait_ms_total_ | counter | 队列等待时长总和(毫秒) |
| queue_wait_count_ | counter | 统计样本数 |
| queue_wait_ms_max_ | gauge | 最大等待时长(毫秒) |
Prometheus 导出示例:
# TYPE tasks_total counter
tasks_total{status="submitted"} 100
tasks_total{status="rejected"} 5
tasks_total{status="succeeded"} 80
tasks_total{status="failed"} 10
tasks_total{status="timeout"} 5
# TYPE tasks_running_current gauge
tasks_running_current 3
# TYPE tasks_pending_current gauge
tasks_pending_current 5
职责:创建、绑定和清理任务专属 cgroup
接口:
// 创建任务 cgroup 并设置 CPU/内存限制
std::string create_cgroup_for_job(int job_id, int cpu_cores, size_t mem_mb, const CgroupConfig& cfg);
// 将 pid 加入 cgroup
bool attach_pid_to_cgroup(pid_t pid, const std::string& cg_path);
// 清理 cgroup 目录
void cleanup_cgroup(const std::string& cg_path);
实现细节(src/cgroup_helper.cpp):
职责:通过 SQLite 持久化任务状态,支持重启恢复
数据模型:
CREATE TABLE jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cmd TEXT NOT NULL,
cpu_cores INTEGER,
memory_mb INTEGER,
timeout_sec INTEGER,
priority INTEGER,
status TEXT, -- queued/running/succeeded/failed/timeout/launch_failed
submit_ms INTEGER,
start_ms INTEGER,
end_ms INTEGER,
exit_code INTEGER
);
关键接口:
class JobStore {
public:
bool init(const std::string& path);
// 插入任务
int insert_job(const JobSpec& spec, PersistStatus status, int64_t submit_ms, ...);
// 更新状态
void update_status(int id, PersistStatus status, ...);
// 加载未完成任务
std::vector<PersistedJob> load_unfinished();
};
恢复策略(Scheduler::restore_from_store()):
void Scheduler::restore_from_store() {
if (!store_) return;
auto jobs = store_->load_unfinished();
for (auto& pj : jobs) {
Job job;
job.id = next_id_++;
job.spec = pj.spec;
job.status = JobStatus::Pending;
job.enqueue_time = std::chrono::steady_clock::now();
pending_.push_back(job);
}
cv_.notify_all();
}
职责:管理 cron 模板,定时生成任务实例
当前实现:
核心逻辑:
void CronScheduler::tick(SubmitCallback submit_cb) {
auto now = std::chrono::system_clock::now();
for (auto& tpl : templates_) {
if (!tpl.enabled) continue;
if (now >= tpl.next_run) {
submit_cb(tpl.spec); // 提交任务实例
tpl.next_run = tpl.cron.next_run(now); // 计算下次触发时间
}
}
}
职责:提供轻量级 HTTP 服务,导出指标和健康检查
路由:
并发模型:
实现细节(src/metrics_http_server.cpp):
class MetricsHttpServer {
public:
using MetricsHandler = std::function<std::string()>;
bool start(int port, MetricsHandler handler);
void stop();
private:
void accept_loop();
void worker_loop(); // ...
};


关键代码(Scheduler::launch_job()):
bool Scheduler::launch_job(Job& job) {
// 创建 cgroup
std::string cg_path;
if (opts_.cgroup.enabled) {
cg_path = create_cgroup_for_job(job.id, job.spec.cpu_cores, job.spec.memory_mb, opts_.cgroup);
}
pid_t pid = fork();
if (pid == 0) {
// 子进程
setpgid(0, 0); // 建立独立进程组
if (!cg_path.empty()) {
attach_pid_to_cgroup(getpid(), cg_path);
}
// 设置 rlimit
if (opts_.rlimit_nofile >= 0) {
struct rlimit rl;
rl.rlim_cur = rl.rlim_max = opts_.rlimit_nofile;
setrlimit(RLIMIT_NOFILE, &rl);
}
if (opts_.disable_core_dump) {
struct rlimit rl;
rl.rlim_cur = rl.rlim_max = 0;
setrlimit(RLIMIT_CORE, &rl);
}
// 切换工作目录
if (!opts_.workdir.empty()) {
chdir(opts_.workdir.c_str());
}
// 执行命令
execl("/bin/sh", "sh", "-c", job.spec.cmd.c_str(), nullptr);
_exit(127);
}
// 父进程
job.pid = pid;
job.pgid = pid;
job.start_time = std::chrono::steady_clock::now();
job.status = JobStatus::Running;
return true;
}

关键代码(Scheduler::reaper_loop()):
void Scheduler::reaper_loop() {
while (!shutting_down_.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::lock_guard lk(mu_);
auto now = std::chrono::steady_clock::now();
for (auto it = running_.begin(); it != running_.end(); ) {
Job& job = it->second;
// 检查超时
if (job.spec.timeout_sec > 0) {
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
now - job.start_time).count();
if (elapsed >= job.spec.timeout_sec) {
if (!job.sigterm_sent) {
// 发送 SIGTERM
kill(-job.pgid, SIGTERM);
job.sigterm_sent = true;
job.kill_deadline = now + std::chrono::seconds(opts_.kill_grace_sec);
} else if (now >= *job.kill_deadline) {
// 发送 SIGKILL
kill(-job.pgid, SIGKILL);
}
}
}
// 尝试回收
int status;
pid_t ret = waitpid(job.pid, &status, WNOHANG);
if (ret > 0) {
// 进程已退出
job.exit_code = status;
job.end_time = now;
if (job.sigterm_sent) {
job.status = JobStatus::Timeout;
metrics_.inc_timeout();
} else if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
job.status = JobStatus::Succeeded;
metrics_.inc_succeeded();
} else {
job.status = JobStatus::Failed;
metrics_.inc_failed();
}
// 释放资源
rm_.release(job.spec.cpu_cores, job.spec.memory_mb);
metrics_.dec_running();
// 清理 cgroup
if (opts_.cgroup.enabled) {
std::string cg_path = opts_.cgroup.base_path + "/job_" + std::to_string(job.id);
cleanup_cgroup(cg_path);
}
it = running_.erase(it);
} else {
++it;
}
}
}
}
目标:根据系统压力暂停新任务启动,避免雪崩
实现原理:
关键代码:
void Scheduler::psi_loop() {
const double threshold = 50.0; // 背压阈值
while (!shutting_down_.load()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
// 读取 memory.pressure
std::string mem_pressure_file = opts_.cgroup.base_path + "/memory.pressure";
std::ifstream ifs(mem_pressure_file); // 格式:some avg10=12.34 avg60=... total=...
double avg10 = parse_psi_avg10(ifs);
bool pressure = (avg10 > threshold);
if (pressure != psi_backpressure_.load()) {
psi_backpressure_.store(pressure);
metrics_.set_pressure_active(pressure);
Logger::instance().log(Logger::Level::Info, pressure ? "PSI backpressure activated" : "PSI backpressure cleared");
}
}
}
struct SchedulerOptions {
ResourceQuota quota; // 资源配额
CgroupConfig cgroup; // cgroup 配置
int max_queue_size{1000}; // 最大队列长度
int kill_grace_sec{2}; // SIGTERM 宽限期
bool enable_priority{false}; // 是否启用优先级调度
bool enable_psi_monitor{false}; // 是否启用 PSI 背压监测
std::vector<std::string> cmd_whitelist; // 命令白名单
std::vector<std::string> cmd_blacklist; // 命令黑名单
std::string workdir; // 工作目录
int metrics_http_port{-1}; // HTTP 指标端口
int rlimit_nofile{-1}; // 文件句柄限制
bool disable_core_dump{true}; // 禁用 core dump
bool enable_persistence{false}; // 启用持久化
std::string db_path{"state/tasks.db"}; // 持久化路径
bool enable_cron{false}; // 启用 cron
int cron_tick_ms{1000}; // cron 检查间隔
};
./scheduler \
--cmd "echo hello" \ # 任务命令
--cpu 1 \ # CPU 核数
--mem 256 \ # 内存 MB
--timeout 5 \ # 超时秒数
--priority 10 \ # 优先级
--total-cpu 4 \ # 总 CPU
--total-mem 2048 \ # 总内存
--cgroup \ # 启用 cgroup
--enable-priority \ # 启用优先级
--metrics-port 8080 \ # HTTP 端口
--whitelist ls,echo \ # 白名单
--blacklist rm,shutdown \ # 黑名单
--workdir /tmp # 工作目录
健康检查:
$ curl http://localhost:8080/health ok
指标导出:
$ curl http://localhost:8080/metrics # TYPE tasks_total counter
tasks_total{status="submitted"} 100
tasks_total{status="rejected"} 5
tasks_total{status="succeeded"} 80 ...

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online