Linux 多线程:生产者消费者模型、阻塞队列与条件变量详解

Linux 多线程:生产者消费者模型、阻塞队列与条件变量详解

什么是生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者强耦合的问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据不同等待消费者处理,直接仍给阻塞队列,消费者不找生产者要数据,而是从阻塞队列中取,阻塞队列就相当于一个缓冲区,平和消费者和生产者的处理能力,这个阻塞队列就是用来给生产者和消费者解耦的。

超市的现实例子

用我们生活中的例子来进行理解,我们买东西的时候都是去超市,而不是去供货商去买,这是为什么呢?假如现在我们需要买一桶方便面,我们直接去找供货商说我要买一桶方便面,你觉得人家供货商会给你吗?当然不会了,人家供货商厂子里的机器跑一轮回,能做出好多的方便面,结果你说你只要一个,那剩下的方便面没人买,这损失不就没人承担了么,况且人家的那个机器跑一轮回,你买一盒方便面的钱都不够人家的电费,所以在现实生活中不会有供货商直接给我们出售商品的,供货商通过超市来进行生产者和消费者的解耦,因为消费者直接去找生产者要数据这样的耦合关系确实不合适,消费者消耗的数据和生产者生产的数据的速度确实不在同一个维度,所以我们必须进行解耦,超市就是一个很好的缓冲区,供货商(生产者)只需要将生产的数据放入到超市(缓冲区),然后消费者(人)需要多少数据就去超市拿即可,这样就消费者就可以不直接向生产者要数据,这样就平衡了消费者和生产者的处理能力。

线程视角下的生产者消费者模型

所以生产者消费者模式其实就是线程之间如何安全高效的进行通信,生产者就是负责生产数据的线程,消费者就是负者处理数据的线程,而超市则是一段具有特定结构的内存空间,因为生产者和消费者的数据是通过这段内存空间来进行通信的,那么这段内存空间就是线程间的共享资源,所以就会有各种并发问题的产生:

  1. 消费者VS消费者:互斥(也就是竞争关系,可能有人会说我们平时去超市买东西的时候,我也没觉得我和其它人有什么竞争,他买他的,我买我的,井水不犯河水,完全没有你所谓的竞争关系么,其实这是因为当前超市的东西很充足,假设现在是世界末日,超市里面只剩下一桶方便面了,这个时候说你们两是竞争关系都有点轻了,这个时候谁能抢到这个方便面说不定就能多活一天,这个时候可是鱼死网破的关系,所以消费者和消费者之间是存在竞争关系的,也就是属于互斥)
  2. 生产者VS生产者:互斥(同样的生产者之间就很好理解为什么是互斥关系了,因为每个供货商都希望自己的竞争对手都破产,这样这个市场就是他一个人的了,钱都被他一个人挣了,不然就得和其它的供货商开始分庭抗礼,这样蛋糕就得分着吃了,所以生产者之间存在的就是竞争关系,属于互斥)
  3. 生产者VS消费者:互斥(这就好比供货商正在往超市放东西,记录人员还没有记录好这个货物,此时消费者是不可以拿走这个东西,这就是互斥),同步(当超市没有货物的时候,消费者是不可以再拿货物了,只有等待生产者供货之后,消费者才可以拿;同样当超市该货物已满时,生产者不必再供货,只有当消费者拿走数据之后,生产者才可以继续供货,可见生产者和消费者处理数据的时候是有顺序性的,所以有同步关系)

现在我们就来通过代码简单的模拟一下生产者消费者模型

main函数

int main() { // 设置随机种子 srand(time(nullptr)); // 创建阻塞队列 BlockQueue<Task> *bq = new BlockQueue<Task>; pthread_t c[3], p[5]; // 创建多生产者线程 for (int i = 0; i < 5; i++) { pthread_create(p + i, nullptr, Producer, bq); } // 创建多消费者线程 for (int i = 0; i < 3; i++) { pthread_create(c + i, nullptr, Consumer, bq); } // 等待线程结束 for (int i = 0; i < 5; i++) { pthread_join(p[i], nullptr); } for (int i = 0; i < 3; i++) { pthread_join(c[i], nullptr); } return 0; } 

 从结果来看,一个简单的生产者消费者模型就这样创建出来了,但是代码中还有一个隐藏的bug,也就是上一篇博客中关于条件变量的错误使用,是很有可能导致我们的程序出现伪唤醒的情况。

生产者线程函数

void *Producer(void *args) { BlockQueue<Task> *bq = (BlockQueue<Task> *)args; // 可选的运算符集合 std::string oper("+-*/%"); while (1) { // 随机生成任务参数 int x = rand() % 10; int y = rand() % 10; Task task(x, y, oper[rand() % 5]); // 向阻塞队列中放入任务 // 如果队列已满,会在 push() 内部阻塞 bq->push(task); std::cout << "生产了一个任务 : "; task.getTask(); sleep(1); } } 

消费者线程函数

void *Consumer(void *args) { BlockQueue<Task> *bq = (BlockQueue<Task> *)args; while (1) { // 从阻塞队列中取任务 // 如果队列为空,会在 pop() 内部阻塞 Task task = bq->pop(); std::cout << "消耗了一个任务 : "; // 执行任务 task.run(); // 模拟处理耗时 sleep(2); } } 

Task:任务模型(让生产和消费更直观)

class Task { public: // x、y:操作数 // oper:运算符 // result:计算结果 // exitcode:错误码(0 表示正常) Task(int x, int y, char oper, int result = 0, int exitcode = 0) : x_(x), y_(y), oper_(oper), result_(result), exitcode_(exitcode) { } // 执行任务 void run() { switch (oper_) { case '+': result_ = x_ + y_; break; case '-': result_ = x_ - y_; break; case '*': result_ = x_ * y_; break; case '/': if (y_ == 0) { exitcode_ = 1; // 除 0 错误 } else { result_ = x_ / y_; } break; case '%': if (y_ == 0) { exitcode_ = 2; // 取模 0 错误 } else { result_ = x_ % y_; } break; } // 输出执行结果 printf("%d %c %d = %d[%d]\n", x_, oper_, y_, result_, exitcode_); } // 打印任务信息 void getTask() { printf("%d %c %d = ?\n", x_, oper_, y_); } private: int x_; int y_; char oper_; int result_; // 计算结果 int exitcode_; // 错误码 }; 

BlockQueue:阻塞队列(核心部分)

template <class T> class BlockQueue { public: // 构造函数 // bqmax:队列最大容量,默认 5 BlockQueue(int bqmax = 5) : bqmax_(bqmax) // 使用初始化列表初始化最大容量 { // 初始化互斥锁,用于保护共享队列 pthread_mutex_init(&mutex_, nullptr); // 初始化条件变量 // c_cond_:消费者条件变量(队列为空时等待) // p_cond_:生产者条件变量(队列满时等待) pthread_cond_init(&c_cond_, nullptr); pthread_cond_init(&p_cond_, nullptr); } // 消费者接口:从队列中取数据 T pop() { // 加锁,保证对共享队列的互斥访问 pthread_mutex_lock(&mutex_); // 当队列为空时,消费者需要等待 if(bq_.size() == 0) { // pthread_cond_wait 做了三件事: // 1. 释放 mutex_ // 2. 线程进入等待队列 // 3. 被唤醒后重新获取 mutex_ pthread_cond_wait(&c_cond_, &mutex_); } // 走到这里,说明队列非空,可以安全取数据 T top = bq_.front(); bq_.pop(); // 取走一个元素后,队列不再是“满”的 // 唤醒因队列满而阻塞的生产者 pthread_cond_signal(&p_cond_); // 解锁 pthread_mutex_unlock(&mutex_); return top; } // 生产者接口:向队列中放数据 void push(const T& in) { // 加锁,保证互斥 pthread_mutex_lock(&mutex_); // 当队列已满时,生产者需要等待 if (bq_.size() == bqmax_) { pthread_cond_wait(&p_cond_, &mutex_); } // 队列未满,可以安全放入数据 bq_.push(in); // 放入新数据后,队列一定非空 // 唤醒因队列空而阻塞的消费者 pthread_cond_signal(&c_cond_); // 解锁 pthread_mutex_unlock(&mutex_); } // 析构函数:释放同步资源 ~BlockQueue() { pthread_mutex_destroy(&mutex_); pthread_cond_destroy(&c_cond_); pthread_cond_destroy(&p_cond_); } private: std::queue<T> bq_; // 实际存放数据的队列(共享资源) int bqmax_; // 队列最大容量 pthread_mutex_t mutex_; // 互斥锁:保护队列 pthread_cond_t c_cond_; // 消费者条件变量 pthread_cond_t p_cond_; // 生产者条件变量 }; 

为什么“判断条件”一定要先加锁?

 void *getTicket(void *args) { threadDate *td = (threadDate *)args; while (1) { pthread_mutex_lock(td->mutex_); if (tickets > 0) { usleep(1000); printf("%s get a tickets , tickets : %d\n", td->threadname.c_str(), tickets); tickets--; pthread_mutex_unlock(td->mutex_); } else { pthread_mutex_unlock(td->mutex_); break; } } return nullptr; } 

阻塞队列有两个典型的约束条件:

  • 队列满时:生产者不能继续生产
  • 队列空时:消费者不能继续消费

这是资源暂时不满足条件

这两段代码分别就是生产者往队列里面放数据和消费者往队列里面拿数据的过程,为了防止多消费者在队列中拿数据时,造成多个线程拿到同一个数据和为了防止多生产者往队列中放数据时,造成数据混乱的情况,因此在多线程拿数据和放数据时要进行加锁,同时当生产者往队列中放满数据之后不可以再继续放和消费者从队列中拿数据时没有数据不可以再拿的情况,我们要再队列为满和为空时进行"判断",如果队列为满,这个时候生产者就必须阻塞;队列为空时,消费者也必须阻塞,现在有一个问题就是为什么判断条件是否满足时,都是先加锁再判断,就连上次多线程抢票的时候也是先加锁再判断票是否已经卖完,这是为什么?

这是因为判断临界资源是否满足条件,其实也是在访问临界资源,不然就会有多个线程同时进入这个判断条件,最后将票抢为负数的情况,所以必先加锁再判断。

伪唤醒:多线程下的隐藏陷阱

这个时候如果我们的资源不满足,这个时候就得将我们的当前线程进行挂起阻塞,直到资源就绪之后才能接着运行,但是现在你一个持有锁的线程如果直接挂起的话,这可能就会导致其它线程在申请锁时申请不到,所以,条件变量的第二个参数是一个互斥锁,目的就是当线程在被挂起的同时,将该锁进行释放,直到该线程被唤醒之后,让该线程重新申请该锁。

现在假设我们是多生产者多消费者,现在假设我们的阻塞队列已经满了,这个时候一个消费线程在消费一个数据之后调用了pthread_cond_broadcast函数调用唤醒了多个生产者,这个时候这些生产者就会重新申请锁,其中一个在获得锁之后,将刚刚消费线程消耗的一个空位进行了填充,这个时候阻塞队列又满了,但是这个线程在执行完自己的任务之后,释放掉锁,但是现在还有生产者由于刚才的误唤醒还在申请锁,这个时候如果生产者获得了锁,这就进而会多产生一个数据,这就是伪唤醒的弊端。

 T pop() { pthread_mutex_lock(&mutex_); while (bq_.size() == 0) { pthread_cond_wait(&c_cond_, &mutex_); } T top = bq_.front(); bq_.pop(); pthread_cond_signal(&p_cond_); pthread_mutex_unlock(&mutex_); return top; } void push(const T in) { pthread_mutex_lock(&mutex_); while (bq_.size() == bqmax_) { pthread_cond_wait(&p_cond_, &mutex_); } bq_.push(in); pthread_cond_signal(&c_cond_); pthread_mutex_unlock(&mutex_); }

所以为了避免伪唤醒的情况发生,这个时候我们只需要进行循环判断条件是否满足即可,因为这样即使是生产者申请到锁了,但是再次判断的时候,资源依旧不满足,这个时候条件变量就会将其挂起,最后将它的锁进行释放,这样就可以避免伪唤醒的情况。

生产者消费者模型是多线程编程中最基础也是最重要的模式。通过阻塞队列,生产者和消费者可以安全、高效地协作,同时避免资源竞争和伪唤醒问题。

理解了互斥、同步和条件变量的配合,你就能轻松应对线程安全设计和高并发场景。

如果你觉得有用,记得 点赞、收藏和关注

Read more

用 Python 调用 Bright Data MCP Server:在 VS Code 中实现实时网页数据抓取

用 Python 调用 Bright Data MCP Server:在 VS Code 中实现实时网页数据抓取

用 Python 调用 Bright Data MCP Server:在 VS Code 中实现实时网页数据抓取,本文介绍了Bright Data的Web MCP Server,这是一款能实现实时、结构化网页数据访问的API,适用于AI应用等场景。其支持静态与动态网页,前3个月每月提供5000次免费请求,有远程托管和本地部署两种方式。文章以在VS Code中用Python调用其API抓取Google搜索结果为例,详解了准备工作、代码编写、参数说明等实战流程,还提及该工具免维护代理池等技术亮点及使用限制。 一、引言:为什么AI时代需要高效的网页数据访问工具? 在大语言模型(LLM)和智能代理(Agent)快速发展的今天,"实时性"成为AI应用落地的关键瓶颈。想象一下:当你的AI助手需要回答"今天上海的天气预警"或"某款产品的最新用户评价"时,它必须依赖实时网页数据才能给出准确答案—

By Ne0inhk
Python NumPy入门指南:数据处理科学计算的瑞士军刀

Python NumPy入门指南:数据处理科学计算的瑞士军刀

作者:唐叔在学习 专栏:唐叔学python 标签:Python NumPy、数据分析、科学计算、机器学习基础、数组操作、Python数据处理、人工智能基础、Python编程 摘要 NumPy是Python科学计算的基础库,提供了高性能的多维数组对象和工具。本文唐叔将带你从零开始了解NumPy的核心概念、常用操作和实际应用场景,助你在数据分析、机器学习等领域快速上手。无论你是Python初学者还是想提升数据处理能力,这篇文章都将成为你的实用指南。 文章目录 * 摘要 * 一、NumPy是什么?为什么它如此重要? * 二、NumPy安装与基础使用 * 2.1 安装NumPy * 2.2 导入NumPy * 2.3 创建第一个NumPy数组 * 三、NumPy核心功能详解 * 3.1 数组属性 * 3.2 创建特殊数组 * 3.3 数组索引与切片

By Ne0inhk
python搭建NPL模型的详细步骤和代码

python搭建NPL模型的详细步骤和代码

目录 * **一、环境准备** * **二、数据准备** * **三、文本预处理** * **1. 清理文本** * **四、特征工程** * **1. TF-IDF** * **2. Word2Vec** * **五、搭建 NLP 模型** * **1. 逻辑回归** * **2. LSTM 深度学习模型** * **六、使用预训练的 BERT 模型** * **七、模型评估** * **八、部署模型** * **总结** * 1. **人机交互的核心技术** * 2. **推动AI技术发展的动力** * 3. **广泛的应用场景** * 4. **多模态融合的关键环节** * 5. **行业数字化转型的加速器** * 6. **未来发展的潜力** 一、环境准备 在开始之前,我们需要安装 NLP

By Ne0inhk
当Python遇见高德:基于PyQt与JS API构建桌面三维地形图应用实战

当Python遇见高德:基于PyQt与JS API构建桌面三维地形图应用实战

摘要: 地图技术作为数字化世界的基石,其应用早已超越了传统的导航和位置服务。对于开发者而言,如何将强大的地图能力集成到不同形态的应用中,是一个充满挑战与机遇的课题。本文将详细阐述一个独特的实践案例:如何利用Python的PyQt5框架,结合高德开放平台强大的JavaScript API 2.1Beta,从零开始构建一个功能丰富的桌面端地图浏览器。项目不仅实现了二维、三维、卫星、地形等多种地图样式的动态切换,还集成了地点搜索(POI)、实时标记等核心功能。本文将深入探讨技术选型、架构设计、核心功能实现、Python与JavaScript双向通信机制,并在此基础上拓展实现“点击获取坐标与地址(逆地理编码)”及“路线规划”等高级功能,旨在为开发者提供一个将Web地图技术无缝融入桌面应用的完整解决方案,展现高德开放平台在跨技术栈融合应用中的卓越潜力。 一、 引言:为何选择在桌面端构建地图应用? 在移动互联网和Web应用大行其道的今天,探讨桌面地图应用的开发似乎有些“复古”。然而,在特定业务场景下,桌面应用依然拥有不可替代的优势。例如,在专业地理信息系统(GIS)、行业数据监控中心、复杂的

By Ne0inhk