手搓简易 Linux 进程池:从 0 到 1 实现基于管道的任务分发系统

手搓简易 Linux 进程池:从 0 到 1 实现基于管道的任务分发系统
在这里插入图片描述

🔥草莓熊Lotso:个人主页
❄️个人专栏: 《C++知识分享》《Linux 入门到实践:零基础也能懂》
✨生活是默默的坚持,毅力是永久的享受!


🎬 博主简介:

在这里插入图片描述

文章目录


前言:

在 Linux 环境下,进程池是一种高效的并发编程模型,它通过预先创建一组子进程来处理任务,避免了频繁创建 / 销毁进程的开销。本文将拆解一个基于管道通信的进程池实现代码,带你理解进程池的核心设计思路、管道通信原理和任务分发机制。

一. 核心设计思路

本进程池实现的核心逻辑:

  • 父进程创建指定数量的子进程,通过匿名管道与每个子进程建立单向通信(父写子读);
  • 父进程采用轮询策略将任务分发给不同子进程,实现简单的负载均衡;
  • 子进程循环读取管道中的任务码,执行对应任务;
  • 父进程通过关闭管道写端通知子进程退出,并回收所有子进程资源。
在这里插入图片描述

二. 代码模块拆解

2.1 任务定义与随机任务生成

这部分是测试用的任务层,定义了进程池要执行的具体任务,以及随机生成任务的工具函数。

#include<iostream>#include<string>#include<vector>#include<memory>#include<functional>#include<ctime>#include<cstdlib>#include<unistd.h>#include<sys/wait.h>#define__MAIN__/////////////////////////////任务测试代码///////////////////////////////////////// 定义任务类型:无参数无返回值的函数对象using task_t = std::function<void()>;// 具体任务1:打印日志(带进程ID标识)voidPrintLog(){ std::cout <<"我是一个打印日志的任务, pid"<<getpid()<< std::endl;}// 具体任务2:模拟下载voidDownLoad(){ std::cout <<"我是一个下载任务, pid"<<getpid()<< std::endl;}// 具体任务3:模拟访问MySQLvoidReadMysql(){ std::cout <<"我是一个访问数据库的任务, pid"<<getpid()<< std::endl;}// 具体任务4:模拟访问RedisvoidWriteRedies(){ std::cout <<"我是一个访问redies的任务, pid"<<getpid()<< std::endl;}// 全局任务列表:存储所有可执行的任务 std::vector<task_t> gtasks;// 加载所有任务到全局列表voidLoadTask(){ gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies);}// 随机生成50个任务码(输出型参数out存储结果)// 作用:模拟业务中随机产生的任务请求// *: 输出型参数// const &: 输入型参数// &: 输入输出型voidRandomTask(std::vector<int>* out){for(int i =0; i <50; i++){// 随机选择任务(0~3)int code =rand()% gtasks.size();usleep(23223);// 模拟任务产生的时间间隔 out->push_back(code);}}// 任务码枚举(增强可读性)#defineLOG_TASK0#defineDOWNLOAD_TASK1#defineMYSQL_TASK2#defineREDIES_TASK3// 任务码转字符串:方便日志打印 std::string TaskToString(int code){switch(code){case LOG_TASK:return"PrintLog";case DOWNLOAD_TASK:return"DownLoad";case MYSQL_TASK:return"ReadMysql";case REDIES_TASK:return"WriteRedies";default:return"Unknown";}}

2.2 子进程任务处理逻辑

Work函数是子进程的核心执行逻辑,负责从管道读取任务码并执行对应任务:

/////////////////////////进程池核心代码////////////////////////// 子进程工作函数:循环读取管道中的任务码并执行// rfd:管道读端文件描述符voidWork(int rfd){while(true){int code =0;// 从管道读取任务码(阻塞读) ssize_t n =read(rfd,&code,sizeof(code));// 读取成功且长度正确:执行任务if(n >0&& n ==sizeof(int)){if(code >=0&& code < gtasks.size()){ gtasks[code]();// 执行对应任务}}// 读取到0:表示管道写端关闭(父进程通知退出)elseif(n ==0){break;// 子进程退出循环}// 读取错误:直接退出else{break;}}}

2.3 通道(Channel)类:封装父子进程通信

Channel类封装了 “管道写端 + 子进程 ID” 的关联关系,简化父进程对单个子进程的管理(发任务、关管道、回收进程):

// 通道类:管理单个子进程的通信管道和进程IDclassChannel{public:// 构造函数:初始化管道写端、子进程ID,生成通道名称Channel(int wfd, pid_t who):_wfd(wfd),_sub_process_id(who){ _name ="Channel-"+ std::to_string(_sub_process_id)+"-"+ std::to_string(_wfd);}intFd(){return _wfd;}// 获取管道写端 pid_t SubId(){return _sub_process_id;}// 获取子进程ID std::string Name(){return _name;}// 获取通道名称(调试用)// 关闭管道写端voidClose(){if(_wfd >=0)close(_wfd);}// 等待子进程退出(回收资源)voidWait(){ pid_t rid =waitpid(_sub_process_id,nullptr,0);(void)rid;// 屏蔽未使用变量警告}// 向子进程发送任务码(写管道)voidSendTask(int taskcode){ ssize_t n =write(_wfd,&taskcode,sizeof(taskcode));(void)n;// 屏蔽未使用变量警告(实际场景应检查写操作是否成功)}~Channel(){}private:int _wfd;// 管道写端文件描述符 pid_t _sub_process_id;// 对应子进程ID std::string _name;// 通道名称(调试用)};

2.4 进程池(ProcesspPool)类:核心管理逻辑

ProcesspPool类是进程池的核心,负责创建子进程、管理通道、分发任务、停止进程池:

classProcesspPool{private:// 轮询选择下一个子进程(负载均衡策略)intNext(){int choice = _next_choice; _next_choice++; _next_choice %= _channels.size();// 取模实现循环return choice;}public:// 构造函数:初始化进程池大小、轮询索引ProcesspPool(int number):_number(number),_next_choice(0){ std::cout <<"number: "<< _number << std::endl;}// 启动进程池(父进程执行):创建指定数量的子进程和管道voidStart(){for(int i =0; i < _number; i++){// 1. 创建匿名管道int pipefd[2];int n =pipe(pipefd);if(n <0){perror("pipe");exit(2);}// 2. 创建子进程 pid_t id =fork();if(id <0){perror("fork");exit(3);}elseif(id ==0)// 子进程逻辑{// 这里后面还有些变化,为了解决下面那个version2close(pipefd[1]);// 子进程关闭写端(只读)Work(pipefd[0]);// 执行工作函数close(pipefd[0]);// 任务完成后关闭读端exit(0);// 子进程退出}else// 父进程逻辑{close(pipefd[0]);// 父进程关闭读端(只写)// 创建通道对象并加入管理列表 _channels.emplace_back(pipefd[1], id);}}}// 推送任务:选择子进程并发送任务码voidPushTask(int taskcode){// 轮询选择一个子进程int who =Next(); _channels[who].SendTask(taskcode);// 打印任务分发日志(调试用) std::cout <<"发送任务: "<<TaskToString(taskcode)<<"["<< taskcode <<"]"<<"给: "<< _channels[who].Name()<< std::endl;}// 停止进程池:关闭所有管道,回收子进程voidStop(){// version1 -- 可以成功// 1. 批量关闭所有管道写端(通知子进程退出)for(auto& channel: _channels){ channel.Close(); std::cout << channel.Name()<<" close success!"<< std::endl;}sleep(3);// 等待子进程处理完最后任务并退出// 2. 批量回收子进程资源for(auto& channel: _channels){ channel.Wait(); std::cout << channel.Name()<<" wait success!"<< std::endl;}// // version2 -- 不能成功(原因:关闭管道后立即wait,子进程可能还未处理完读操作,导致阻塞)// for(auto& channel: _channels)// {// channel.Close();// channel.Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// }// // version3 -- 可以成功(逆序关闭+回收,避免资源竞争)// int end = _channels.size() - 1;// while(end >= 0)// {// _channels[end].Close();// _channels[end].Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// end--;// }}// 调试打印:输出所有通道信息voidDebugPrint(){ std::cout <<"------------------------------------"<< std::endl;for(auto& channel : _channels){ std::cout << channel.Fd()<< std::endl; std::cout << channel.SubId()<< std::endl; std::cout << channel.Name()<< std::endl;} std::cout <<"------------------------------------"<< std::endl;}~ProcesspPool(){}private: std::vector<Channel> _channels;// 管理所有子进程的通道int _number;// 进程池大小(子进程数量)int _next_choice;// 轮询索引(下一个要分发任务的子进程)};

2.5 主函数:进程池使用示例

主函数是进程池的入口,负责初始化、启动、分发任务、停止进程池:

#ifdef__MAIN__// 用法提示函数staticvoidUsage(const std::string &proc){ std::cout <<"Usage:\n\t"<< proc <<" proceess_number"<< std::endl;}// 程序入口:./process_pool 5(5为子进程数量)intmain(int argc,char* argv[]){// 检查命令行参数if(argc !=2){Usage(argv[0]);exit(1);}int number = std::stoi(argv[1]);// 0. 初始化:加载任务、随机生成50个任务码srand(time(nullptr)^getpid());// 设置随机数种子(结合时间+进程ID)LoadTask(); std::vector<int> task_codes;RandomTask(&task_codes);// 1. 创建进程池对象(智能指针自动管理内存) std::unique_ptr<ProcesspPool> pp = std::make_unique<ProcesspPool>(number);// 2. 启动进程池(创建子进程和管道) pp->Start();sleep(2);// 等待所有子进程初始化完成// 3. 分发所有随机任务for(auto task : task_codes){ pp->PushTask(task);usleep(500000);// 模拟任务分发间隔(500ms)}// // 注释部分:交互式输入任务码(调试用)// while(true)// {// int code = 0;// std::cout << "Please Enter Your Task# ";// std::cin >> code;// if(code < 0 || code > gtasks.size())// {// std::cout << "任务码错误, 请重新输入" << std::endl;// continue;// }// pp->PushTask(code);// }// 4. 停止进程池(回收资源) pp->Stop();return0;}#endif

三. 关键知识点解析

3.1 管道通信原理

  • 匿名管道pipe()创建的文件描述符对pipefd[0](读)、pipefd[1](写)是单向的;
  • 父子进程继承管道文件描述符,通过关闭不需要的端实现 “父写子读”;
  • 当写端关闭后,读端read()会返回 0,子进程通过这个信号判断退出。

3.2 轮询负载均衡

Next()函数通过递增取模的方式,循环选择子进程,确保任务均匀分发给所有子进程,避免单个子进程过载。

3.3 进程回收的坑

Stop()函数中 version2 失败的原因:父进程关闭管道后立即waitpid(),子进程可能还在阻塞读管道,此时父进程waitpid()会阻塞,而子进程读取到管道关闭后退出,但若所有子进程都处于这种状态,会导致死锁。version1 先批量关闭所有管道,等待 3 秒让子进程全部退出后再回收,避免了这个问题。

总结
版本2失败的根本原因是父进程在等待一个子进程时,其他子进程的写端并未关闭(因为都是继承了父进程,关闭了一个,但是后面的子进程关闭一个进行读还是不可避免的继承了上次之前的),导致它们无法退出,从而形成串行阻塞。版本1通过先关闭所有写端,让子进程并发退出,避免了这一风险。因此,在实际开发中,应当采用版本1或类似策略来确保进程池能够优雅地停止。
在这里插入图片描述
在这里插入图片描述

我们可以怎么样去修改使version2变成可行的方案?

在这里插入图片描述


在这里插入图片描述

四. 完整代码展示

#include<iostream>#include<string>#include<vector>#include<memory>#include<functional>#include<ctime>#include<cstdlib>#include<unistd.h>#include<sys/wait.h>#define__MAIN__/////////////////////////////任务测试代码///////////////////////////////////////using task_t = std::function<void()>;voidPrintLog(){ std::cout <<"我是一个打印日志的任务, pid"<<getpid()<< std::endl;}voidDownLoad(){ std::cout <<"我是一个下载任务, pid"<<getpid()<< std::endl;}voidReadMysql(){ std::cout <<"我是一个访问数据库的任务, pid"<<getpid()<< std::endl;}voidWriteRedies(){ std::cout <<"我是一个访问redies的任务, pid"<<getpid()<< std::endl;} std::vector<task_t> gtasks;voidLoadTask(){ gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies);}// *: 输出型参数// const &: 输入型参数// &: 输入输出型voidRandomTask(std::vector<int>* out){for(int i =0; i <50; i++){int code =rand()% gtasks.size();usleep(23223); out->push_back(code);}}#defineLOG_TASK0#defineDOWNLOAD_TASK1#defineMYSQL_TASK2#defineREDIES_TASK3 std::string TaskToString(int code){switch(code){case LOG_TASK:return"PrintLog";case DOWNLOAD_TASK:return"DownLoad";case MYSQL_TASK:return"ReadMysql";case REDIES_TASK:return"WriteRedies";default:return"Unknown";}}/////////////////////////进程池代码////////////////////////voidWork(int rfd){while(true){int code =0; ssize_t n =read(rfd,&code,sizeof(code));if(n >0&& n ==sizeof(int)){if(code >=0&& code < gtasks.size()){ gtasks[code]();}}elseif(n ==0){break;// 子进程只要读到返回值为0, 表明父进程让我退出}else{break;}}}classChannel{public:Channel(int wfd, pid_t who):_wfd(wfd),_sub_process_id(who){ _name ="Channel-"+ std::to_string(_sub_process_id)+"-"+ std::to_string(_wfd);}intFd(){return _wfd;} pid_t SubId(){return _sub_process_id;} std::string Name(){return _name;}voidClose(){if(_wfd >=0)close(_wfd);}voidWait(){ pid_t rid =waitpid(_sub_process_id,nullptr,0);(void)rid;}voidSendTask(int taskcode){ ssize_t n =write(_wfd,&taskcode,sizeof(taskcode));(void)n;}~Channel(){}private:int _wfd; pid_t _sub_process_id; std::string _name;};classProcesspPool{private:intNext(){int choice = _next_choice; _next_choice++; _next_choice %= _channels.size();return choice;}public:ProcesspPool(int number):_number(number),_next_choice(0){ std::cout <<"number: "<< _number << std::endl;}// 父进程voidStart(){for(int i =0; i < _number; i++){// 1. 创建管道int pipefd[2];int n =pipe(pipefd);if(n <0){perror("pipe");exit(2);}// 2. 创建子进程 pid_t id =fork();if(id <0){perror("fork");exit(3);}elseif(id ==0)// 子进程{// 关闭父进程历史的wfd!for(auto& channel : _channels) channel.Close();close(pipefd[1]);Work(pipefd[0]);close(pipefd[0]);exit(0);}else// 父进程{close(pipefd[0]);// _channels c(pipefd[1], fd);// _channels.push_back(c); _channels.emplace_back(pipefd[1], id);// 内部会直接构造}}}// 1. 什么任务? 任务码决定// 2. 任务给谁? 属于进程池内部操作,负载均衡(我这里是用的轮询的机制)voidPushTask(int taskcode){// 选择一个子进程int who =Next(); _channels[who].SendTask(taskcode); std::cout <<"发送任务: "<<TaskToString(taskcode)<<"["<< taskcode <<"]"<<"给: "<< _channels[who].Name()<< std::endl;}// 有版本存在一些问题, 后续会说为什么voidStop(){// version1 -- 可以成功// 1. 关闭wfdfor(auto& channel: _channels){ channel.Close(); std::cout << channel.Name()<<" close success!"<< std::endl;}sleep(3);// 2. 回收子进程for(auto& channel: _channels){ channel.Wait(); std::cout << channel.Name()<<" wait success!"<< std::endl;}// // version2 -- 不能成功???// for(auto& channel: _channels)// {// channel.Close();// channel.Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// }// version3 -- 可以成功// int end = _channels.size() - 1;// while(end >= 0)// {// _channels[end].Close();// _channels[end].Wait();// std::cout << channel.Name() << " close and wait success!" << std::endl;// end--;// }}voidDebugPrint(){ std::cout <<"------------------------------------"<< std::endl;for(auto& channel : _channels){ std::cout << channel.Fd()<< std::endl; std::cout << channel.SubId()<< std::endl; std::cout << channel.Name()<< std::endl;} std::cout <<"------------------------------------"<< std::endl;}~ProcesspPool(){}private: std::vector<Channel> _channels;int _number;int _next_choice;};// 父进程#ifdef__MAIN__staticvoidUsage(const std::string &proc){ std::cout <<"Usage:\n\t"<< proc <<" proceess_number"<< std::endl;}// ./process_pool 5intmain(int argc,char* argv[]){if(argc !=2){Usage(argv[0]);exit(1);}int number = std::stoi(argv[1]);// 0. 加载任务并随机生成任务srand(time(nullptr)^getpid());LoadTask(); std::vector<int> task_codes;RandomTask(&task_codes);// 1. 创建进程池对象 std::unique_ptr<ProcesspPool> pp = std::make_unique<ProcesspPool>(number);// 2. 启动进程池 pp->Start();sleep(2);for(auto task : task_codes){ pp->PushTask(task);usleep(500000);}// // 自己输入发送任务// while(true)// {// int code = 0;// std::cout << "Please Enter Your Task# ";// std::cin >> code;// if(code < 0 || code > gtasks.size())// {// std::cout << "任务码错误, 请重新输入" << std::endl;// continue;// }// pp->PushTask(code);// } pp->Stop();return0;}#endif

五. 编译与运行(附 Makefile)

process_pool:process_pool.cc g++ -o$@ $^ -std=c++14 .PHONY:clean clean: rm-f process_pool 
  • 编译:直接 make;
  • 运行./process_pool 5(5 为子进程数量,可自定义);
  • 输出:可以看到任务被轮询分发给不同子进程,每个任务打印对应的进程 ID,最后进程池正常停止并回收资源。

六. 扩展与优化方向

  • 错误处理:当前代码未处理write()/read()的错误返回值,实际场景应增加重试、日志记录;
  • 动态扩容:支持运行时增加 / 减少子进程数量;
  • 更优的负载均衡:基于子进程当前任务数、CPU 使用率等动态分发;
  • 任务队列:父进程增加任务队列,避免任务分发过快导致管道阻塞;
  • 信号处理:增加SIGCHLD信号处理,异步回收子进程,避免僵尸进程。

结尾:

🍓 我是草莓熊 Lotso!若这篇技术干货帮你打通了学习中的卡点: 👀 【关注】跟我一起深耕技术领域,从基础到进阶,见证每一次成长 ❤️ 【点赞】让优质内容被更多人看见,让知识传递更有力量 ⭐ 【收藏】把核心知识点、实战技巧存好,需要时直接查、随时用 💬 【评论】分享你的经验或疑问(比如曾踩过的技术坑?),一起交流避坑 🗳️ 【投票】用你的选择助力社区内容方向,告诉大家哪个技术点最该重点拆解 技术之路难免有困惑,但同行的人会让前进更有方向~愿我们都能在自己专注的领域里,一步步靠近心中的技术目标! 

结语:本文通过拆解一个极简的进程池实现,带你理解了 Linux 进程间通信、进程管理的核心知识点。这个进程池虽然简单,但涵盖了进程池的核心设计思想,是学习 Linux 并发编程的绝佳案例。

✨把这些内容吃透超牛的!放松下吧✨ʕ˘ᴥ˘ʔづきらど

Read more

从安装到代码提交:Git 远程协作中 90% 的问题都能在这里找到答案

从安装到代码提交:Git 远程协作中 90% 的问题都能在这里找到答案

工欲善其事,必先利其器。 目录 * 安装 Git 的步骤: * 本地Git与远程仓库连接及操作全指南 * 一、本地仓库初始化与远程仓库连接 * 1. 初始化本地Git仓库 * 2. 关联远程仓库 * 1. 查看当前分支状态 * 2. 新建本地分支 * 方法1:基于当前分支创建新分支 * 方法2:创建并直接切换到新分支(推荐) * 方法3:基于远程分支创建本地分支 * 3. 切换到已有的本地分支 * 二、分支管理与远程分支同步 * 1. 查看远程分支 * 2. 拉取远程分支到本地 * 三、代码提交与推送到远程仓库 * 1. 常规提交流程 * 2. 简化推送命令 * 四、远程仓库信息查看与更新 * 1. 查看远程仓库详细信息 * 2. 同步远程仓库最新数据 * 五、常见问题解决与优化配置 * 1. 网络与连接问题修复 * 2. 推送大文件或提升传输稳定性

By Ne0inhk
Git下载及安装保姆级教程(内附快速下载方法)

Git下载及安装保姆级教程(内附快速下载方法)

一、下载Git 1、Git的下载地址 Git-2.47.1-64-bit https://git-scm.com/downloads 选择相应的操作系统下载,这里给出的是当前最新版本2.47.1,如需下载之前的版本,可在图片显示的红框内,点击Older releases即可。 PS:由于一些原因,Git安装包下载速度较慢,可以复制资源链接到迅雷等第三方下载工具下载或直接下载本文的资源即可 2、等待安装 找到下载的安装包双击进行安装。 二、Git的安装 1、阅读说明 点击Next进行下一步。 2、选择安装路径 默认安装路径为C:\Program Files\Git,如需修改,点击①Browse选择文件夹,无需修改点击②Next进行下一步。 3、选择安装组件 ①为在桌面上显示Git图标,可以勾选。其余默认选项不建议取消勾选,以免安装出现意外问题。如确认无误,点击②

By Ne0inhk

Git常用指令

Git 常用50个核心操作命令(附详细说明) 以下按仓库初始化与配置、文件状态与暂存、提交与日志、分支管理、远程仓库、合并与变基、标签、撤销与回滚、LFS大文件、高级实用十大场景分类,覆盖开发全流程高频操作,命令简洁且标注适用场景,新手也能直接套用。 一、仓库初始化与全局配置(5个) 主要用于首次使用Git的环境配置、本地仓库创建,配置后全局生效(除非单独修改仓库配置)。 1. git config --global user.name "你的用户名" 配置Git全局提交用户名(GitHub/GitLab的用户名,必填)。 2. git config --global user.email "你的邮箱" 配置Git全局提交邮箱(与GitHub/GitLab绑定的邮箱,必填)。 3.

By Ne0inhk
【全网最全的的本地部署Code Agent攻略参考】跃阶星辰AI开源Step-3.5-Flash

【全网最全的的本地部署Code Agent攻略参考】跃阶星辰AI开源Step-3.5-Flash

1. 简介 Step 3.5 Flash(访问官网)是我们目前最强大的开源基础模型,专为提供前沿推理与智能体能力而设计,同时具备卓越的效率。基于稀疏混合专家(MoE)架构,它每处理一个token仅激活1960亿参数中的110亿。这种"智能密度"使其推理深度可比肩顶级闭源模型,同时保持实时交互所需的敏捷性。 2. 核心能力 * 高速深度推理:聊天机器人擅长阅读,而智能体必须快速推理。通过三路多token预测(MTP-3)技术,Step 3.5 Flash在典型使用场景中实现100-300 tok/s的生成吞吐量(单流编码任务峰值达350 tok/s),能即时响应复杂的多步推理链条。 * 编码与智能体的强力引擎:Step 3.5 Flash专为智能体任务打造,集成可扩展的强化学习框架驱动持续自我进化。其SWE-bench Verified通过率74.4%,Terminal-Bench 2.0通过率51.

By Ne0inhk