一、多线程基础原理
1. 线程是什么?
在 Linux 系统中,线程是进程内的执行流,是操作系统调度的最小单位。可以用'工厂模型'通俗理解:
介绍 Linux 下 C++ 多线程的核心原理,包括线程与进程区别、pthread API 使用、同步机制(互斥锁、条件变量)、死锁处理及线程池实现。涵盖从基础概念到生产级代码的 RAII 封装、生产者消费者模型、动态线程池设计,以及 GDB、Valgrind 等调试工具的使用指南,旨在帮助开发者构建高并发、安全的系统级应用。

在 Linux 系统中,线程是进程内的执行流,是操作系统调度的最小单位。可以用'工厂模型'通俗理解:
Linux 下线程的本质:Linux 没有专门的'线程'内核对象,线程是通过轻量级进程(LWP) 实现的 —— 多个线程共享同一个进程的地址空间、文件描述符等资源,内核调度时以 LWP 为单位。
| 维度 | 进程 | 线程 |
| 资源占用 | 大(独立地址空间、页表等) | 小(共享进程资源,仅私有栈/寄存器) |
| 创建/销毁开销 | 高(需分配地址空间、初始化资源) | 低(仅需分配栈空间,复用进程资源) |
| 通信方式 | 复杂(管道、消息队列、Socket 等) | 简单(共享内存、全局变量,需同步) |
| 调度开销 | 高(上下文切换需切换地址空间) | 低(仅切换寄存器/栈,地址空间不变) |
| 独立性 | 高(一个进程崩溃不影响其他) | 低(一个线程崩溃导致整个进程崩溃) |
| 适用场景 | 独立任务(如多个应用程序) | 高并发、数据共享的任务(如服务器处理请求) |
pthread_t 类型(非数值,需用 pthread_self() 获取,pthread_equal() 比较),内核层面对应 LWP(可通过 ps -Lf <pid> 查看);pthread_attr_setstacksize() 修改),栈溢出是线程崩溃的常见原因;核心 API(pthread 库,编译需加 -lpthread):
pthread_create(pthread_t *tid, const pthread_attr_t *attr, void *(*func)(void*), void *arg):创建线程,func 是线程执行函数,arg 是参数;pthread_join(pthread_t tid, void **retval):阻塞等待线程退出,回收线程资源,retval 接收线程返回值;pthread_detach(pthread_t tid):将线程设为分离态,线程退出后自动回收资源(无需 pthread_join);pthread_exit(void *retval):线程主动退出,retval 为返回值(不能返回栈变量,会导致野指针);pthread_self():获取当前线程 ID;pthread_cancel(pthread_t tid):请求取消线程(需线程配合取消点)。pthread_join 回收,否则会产生'僵尸线程';pthread_join,但无法获取线程返回值;pthread_mutex_t)保护共享资源;② 读写锁(pthread_rwlock_t)优化读多写少场景;③ 条件变量实现线程间同步;④ 避免共享可变资源(如用线程局部存储 __thread);⑤ 原子操作(std::atomic)替代锁(轻量级)。死锁 4 个必要条件
避免死锁的工业级方案:
pthread_mutex_timedlock),超时则释放已持有的锁;pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)),但仅适用于同一线程重复申请锁的场景(不可滥用)。死锁排查的工业级工具:
pstack <pid>:打印进程的线程栈,查看线程卡在哪个锁的 pthread_mutex_lock 处;gdb 调试:info threads 查看所有线程,thread <tid> 切换线程,bt 查看调用栈;valgrind --tool=helgrind ./program:检测数据竞争和死锁;| 维度 | 互斥锁 | 自旋锁 |
| 等待方式 | 线程阻塞(放弃 CPU) | 线程忙等(循环检测锁是否释放) |
| 开销 | 上下文切换开销(高) | CPU 空转开销(低) |
| 适用场景 | 锁持有时间长(如 IO 操作、复杂计算) | 锁持有时间极短(如几行内存操作) |
| 资源占用 | 阻塞时释放 CPU,资源占用低 | 忙等时占用 CPU,资源占用高 |
工业级选型原则:
pthread_spinlock_t);sleep、read),会导致 CPU 空转浪费。核心作用:
实现线程间的'等待 - 唤醒'同步(如生产者消费者模型中,消费者等待生产者生产数据,生产者生产后唤醒消费者)。
必须搭配互斥锁的原因:
pthread_cond_wait 对条件的检查是原子操作;pthread_cond_wait 可能被系统信号、伪唤醒触发,需在循环中检查条件(而非 if),互斥锁保证检查条件时共享资源的安全性。工业级正确用法(伪代码):
pthread_mutex_lock(&mutex); // 循环检查条件,避免虚假唤醒 while (条件不满足) { pthread_cond_wait(&cond, &mutex); // 自动释放锁,阻塞等待;被唤醒后重新加锁 } // 处理共享资源 pthread_mutex_unlock(&mutex);
核心原理:
预先创建一组线程,放入线程池,当有任务到来时,将任务加入任务队列,线程池中的线程从队列中取任务执行,避免频繁创建/销毁线程的开销(工业级高并发服务的标配)。
工业级线程池的核心设计要点:
g++ -std=c++17 -lpthread -O2 -Wall -Wextra ./thread_demo.cpp -o thread_demo(-O2 开启优化,-Wall 开启警告);#include <iostream>
#include <pthread.h>
#include <string>
#include <stdexcept>
#include <unistd.h>
// 工业级 RAII 封装 pthread 线程
class Thread {
public:
// 线程函数类型定义
using ThreadFunc = void* (*)(void*);
// 构造函数:创建线程,默认设为分离态(避免僵尸线程)
Thread(ThreadFunc func, void* arg, const std::string& name = "Thread")
: tid_(0), func_(func), arg_(arg), name_(name), is_running_(false) {
// 设置线程属性:分离态
pthread_attr_t attr;
int ret = pthread_attr_init(&attr);
if (ret != 0) {
throw std::runtime_error("pthread_attr_init failed, errno: " + std::to_string(ret));
}
ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (ret != 0) {
pthread_attr_destroy(&attr);
throw std::runtime_error("pthread_attr_setdetachstate failed, errno: " + std::to_string(ret));
}
// 创建线程
ret = pthread_create(&tid_, &attr, func_, arg_);
if (ret != 0) {
pthread_attr_destroy(&attr);
throw std::runtime_error("pthread_create failed, errno: " + std::to_string(ret));
}
pthread_attr_destroy(&attr);
is_running_ = true;
std::cout << "Thread " << name_ << " created, tid: " << (unsigned long)tid_ << std::endl;
}
// 析构函数:确保线程资源释放(分离态无需 join)
~Thread() {
if (is_running_) {
std::cout << "Thread " << name_ << " destroyed" << std::endl;
}
}
// 禁止拷贝(线程不可拷贝)
Thread(const Thread&) = delete;
Thread& operator=(const Thread&) = delete;
// 获取线程 ID
pthread_t tid() const { return tid_; }
private:
pthread_t tid_; // 线程 ID
ThreadFunc func_; // 线程执行函数
void* arg_; // 函数参数
std::string name_; // 线程名称(便于调试)
bool is_running_; // 线程是否运行
};
// 线程执行函数(打印参数,模拟工作)
void* thread_func(void* arg) {
std::string* msg = static_cast<std::string*>(arg);
for (int i = 0; i < 5; ++i) {
std::cout << "Thread working: " << *msg << ", count: " << i << std::endl;
sleep(1);
}
// 注意:arg 是堆变量,需手动释放
delete msg;
pthread_exit(nullptr);
}
int main() {
try {
// 传递堆变量,避免栈变量销毁问题
std::string* msg = new std::string("Hello Linux Thread");
Thread t(thread_func, msg, "WorkerThread");
// 主线程等待子线程执行完(分离态线程不影响,仅为演示)
sleep(6);
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
编译与运行:
g++ -std=c++17 -lpthread thread_raii.cpp -o thread_raii
./thread_raii
核心亮点(工业级):
生产者消费者模型是多线程的经典场景,工业级实现需处理虚假唤醒、边界条件、优雅退出。
#include <iostream>
#include <pthread.h>
#include <queue>
#include <string>
#include <stdexcept>
#include <unistd.h>
#include <atomic>
// 工业级线程安全的队列(生产者消费者核心)
template <typename T>
class SafeQueue {
public:
SafeQueue(int max_size = 10) : max_size_(max_size), is_running_(true) {
// 初始化互斥锁和条件变量
int ret = pthread_mutex_init(&mutex_, nullptr);
if (ret != 0) {
throw std::runtime_error("pthread_mutex_init failed: " + std::to_string(ret));
}
ret = pthread_cond_init(¬_empty_, nullptr);
if (ret != 0) {
pthread_mutex_destroy(&mutex_);
throw std::runtime_error("pthread_cond_init not_empty failed: " + std::to_string(ret));
}
ret = pthread_cond_init(¬_full_, nullptr);
if (ret != 0) {
pthread_cond_destroy(¬_empty_);
pthread_mutex_destroy(&mutex_);
throw std::runtime_error("pthread_cond_init not_full failed: " + std::to_string(ret));
}
}
~SafeQueue() {
// 优雅退出:停止接收新任务,唤醒所有等待线程
is_running_ = false;
pthread_cond_broadcast(¬_empty_);
pthread_cond_broadcast(¬_full_);
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(¬_empty_);
pthread_cond_destroy(¬_full_);
}
// 禁止拷贝
SafeQueue(const SafeQueue&) = delete;
SafeQueue& operator=(const SafeQueue&) = delete;
// 生产任务:队列满则阻塞
bool push(const T& task) {
pthread_mutex_lock(&mutex_);
// 循环检查:队列满且运行中则等待
while (is_running_ && queue_.size() >= max_size_) {
pthread_cond_wait(¬_full_, &mutex_);
}
// 若已停止运行,释放锁并返回
if (!is_running_) {
pthread_mutex_unlock(&mutex_);
return false;
}
// 生产任务
queue_.push(task);
std::cout << "Producer push task: " << task << ", queue size: " << queue_.size() << std::endl;
// 唤醒消费者(队列非空)
pthread_cond_signal(¬_empty_);
pthread_mutex_unlock(&mutex_);
return true;
}
// 消费任务:队列空则阻塞
bool pop(T& task) {
pthread_mutex_lock(&mutex_);
// 循环检查:队列空且运行中则等待(避免虚假唤醒)
while (is_running_ && queue_.empty()) {
pthread_cond_wait(¬_empty_, &mutex_);
}
// 若已停止运行且队列为空,释放锁并返回
if (!is_running_ && queue_.empty()) {
pthread_mutex_unlock(&mutex_);
return false;
}
// 消费任务
task = queue_.front();
queue_.pop();
std::cout << "Consumer pop task: " << task << ", queue size: " << queue_.size() << std::endl;
// 唤醒生产者(队列非满)
pthread_cond_signal(¬_full_);
pthread_mutex_unlock(&mutex_);
return true;
}
// 停止队列(优雅退出)
void stop() {
pthread_mutex_lock(&mutex_);
is_running_ = false;
pthread_mutex_unlock(&mutex_);
pthread_cond_broadcast(¬_empty_);
pthread_cond_broadcast(¬_full_);
}
private:
std::queue<T> queue_; // 任务队列
int max_size_; // 队列最大容量(避免内存溢出)
std::atomic<bool> is_running_; // 队列是否运行(原子变量,避免数据竞争)
pthread_mutex_t mutex_; // 互斥锁
pthread_cond_t not_empty_; // 队列非空条件变量(唤醒消费者)
pthread_cond_t not_full_; // 队列非满条件变量(唤醒生产者)
};
// 生产者线程函数
void* producer_func(void* arg) {
SafeQueue<std::string>* queue = static_cast<SafeQueue<std::string>*>(arg);
for (int i = 0; i < 8; ++i) {
std::string task = "Task-" + std::to_string(i);
queue->push(task);
sleep(1); // 模拟生产耗时
}
return nullptr;
}
// 消费者线程函数
void* consumer_func(void* arg) {
SafeQueue<std::string>* queue = static_cast<SafeQueue<std::string>*>(arg);
std::string task;
while (queue->pop(task)) {
// 模拟消费耗时
sleep(2);
}
return nullptr;
}
int main() {
try {
SafeQueue<std::string> queue(5); // 队列最大容量 5
// 创建生产者和消费者线程
pthread_t producer_tid, consumer_tid;
pthread_create(&producer_tid, nullptr, producer_func, &queue);
pthread_create(&consumer_tid, nullptr, consumer_func, &queue);
// 主线程运行 15 秒后停止队列
sleep(15);
queue.stop();
std::cout << "Main thread stop queue" << std::endl;
// 等待线程退出
pthread_join(producer_tid, nullptr);
pthread_join(consumer_tid, nullptr);
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
核心亮点(工业级):
not_empty/not_full 分别唤醒消费者/生产者,减少无效唤醒;is_running_ 控制队列运行状态,支持优雅退出;线程池是工业级高并发服务的核心组件,以下实现包含'任务队列、线程管理、优雅退出、任务拒绝策略'等工业级特性。
#include <iostream>
#include <pthread.h>
#include <queue>
#include <string>
#include <stdexcept>
#include <unistd.h>
#include <atomic>
#include <functional>
#include <vector>
// 任务类型:可调用对象(支持 lambda、函数指针等)
using Task = std::function<void()>;
// 任务拒绝策略枚举
enum class RejectPolicy {
BLOCK, // 阻塞等待
DISCARD, // 丢弃任务
CALLER_RUN // 调用者自行执行
};
// 工业级线程池
class ThreadPool {
public:
ThreadPool(int core_size = 4, int max_size = 8, int queue_size = 100, RejectPolicy policy = RejectPolicy::BLOCK)
: core_size_(core_size), max_size_(max_size), queue_size_(queue_size), reject_policy_(policy),
is_running_(true), current_threads_(0), idle_threads_(0) {
// 初始化同步资源
int ret = pthread_mutex_init(&mutex_, nullptr);
if (ret != 0) throw std::runtime_error("mutex init failed: " + std::to_string(ret));
ret = pthread_cond_init(¬_empty_, nullptr);
if (ret != 0) {
pthread_mutex_destroy(&mutex_);
throw std::runtime_error("cond not_empty init failed: " + std::to_string(ret));
}
ret = pthread_cond_init(¬_full_, nullptr);
if (ret != 0) {
pthread_cond_destroy(¬_empty_);
pthread_mutex_destroy(&mutex_);
throw std::runtime_error("cond not_full init failed: " + std::to_string(ret));
}
// 创建核心线程
for (int i = 0; i < core_size_; ++i) {
create_thread();
}
}
~ThreadPool() {
// 优雅退出
is_running_ = false;
pthread_cond_broadcast(¬_empty_); // 唤醒所有等待任务的线程
pthread_cond_broadcast(¬_full_); // 唤醒所有等待队列的线程
// 等待所有线程退出
pthread_mutex_lock(&mutex_);
for (pthread_t tid : threads_) {
pthread_join(tid, nullptr);
}
pthread_mutex_unlock(&mutex_);
// 释放同步资源
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(¬_empty_);
pthread_cond_destroy(¬_full_);
}
// 禁止拷贝
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
// 提交任务
bool submit(const Task& task) {
pthread_mutex_lock(&mutex_);
// 检查队列是否满
while (is_running_ && task_queue_.size() >= queue_size_) {
switch (reject_policy_) {
case RejectPolicy::BLOCK:
// 阻塞等待队列非满
pthread_cond_wait(¬_full_, &mutex_);
break;
case RejectPolicy::DISCARD:
// 丢弃任务
pthread_mutex_unlock(&mutex_);
std::cerr << "Task discarded: queue is full" << std::endl;
return false;
case RejectPolicy::CALLER_RUN:
// 调用者自行执行任务
pthread_mutex_unlock(&mutex_);
task();
std::cout << "Task run by caller: queue is full" << std::endl;
return true;
}
}
if (!is_running_) {
pthread_mutex_unlock(&mutex_);
std::cerr << "ThreadPool is stopped, reject task" << std::endl;
return false;
}
// 添加任务到队列
task_queue_.push(task);
std::cout << "Submit task, queue size: " << task_queue_.size() << std::endl;
// 唤醒空闲线程
pthread_cond_signal(¬_empty_);
// 动态扩容:核心线程已满,队列未满,且当前线程数<最大线程数
if (idle_threads_ == 0 && current_threads_ < max_size_) {
create_thread();
}
pthread_mutex_unlock(&mutex_);
return true;
}
private:
// 创建线程
void create_thread() {
pthread_t tid;
int ret = pthread_create(&tid, nullptr, thread_func, this);
if (ret != 0) {
pthread_mutex_unlock(&mutex_);
throw std::runtime_error("create thread failed: " + std::to_string(ret));
}
threads_.push_back(tid);
current_threads_++;
idle_threads_++;
std::cout << "Create thread, current threads: " << current_threads_ << std::endl;
}
// 线程执行函数
static void* thread_func(void* arg) {
ThreadPool* pool = static_cast<ThreadPool*>(arg);
pool->run();
return nullptr;
}
// 线程运行逻辑
void run() {
while (is_running_) {
Task task;
pthread_mutex_lock(&mutex_);
// 空闲线程数 +1
idle_threads_++;
// 等待任务(队列非空或线程池停止)
while (is_running_ && task_queue_.empty()) {
std::cout << "Thread " << (unsigned long)pthread_self() << " wait task" << std::endl;
pthread_cond_wait(¬_empty_, &mutex_);
}
// 空闲线程数 -1
idle_threads_--;
// 取出任务
if (!task_queue_.empty()) {
task = task_queue_.front();
task_queue_.pop();
std::cout << "Thread " << (unsigned long)pthread_self() << " take task, queue size: " << task_queue_.size() << std::endl;
// 唤醒提交任务的线程(队列非满)
pthread_cond_signal(¬_full_);
}
pthread_mutex_unlock(&mutex_);
// 执行任务(解锁后执行,避免锁持有时间过长)
if (task) {
try {
task();
} catch (const std::exception& e) {
std::cerr << "Task execute failed: " << e.what() << std::endl;
}
}
// 动态缩容:当前线程数>核心线程数,且空闲时间过长(简化版:直接退出)
if (current_threads_ > core_size_ && idle_threads_ > core_size_) {
pthread_mutex_lock(&mutex_);
current_threads_--;
// 从线程列表中移除当前线程
for (auto it = threads_.begin(); it != threads_.end(); ++it) {
if (*it == pthread_self()) {
threads_.erase(it);
break;
}
}
pthread_mutex_unlock(&mutex_);
std::cout << "Thread " << (unsigned long)pthread_self() << " exit, current threads: " << current_threads_ << std::endl;
return;
}
}
}
private:
int core_size_; // 核心线程数
int max_size_; // 最大线程数
int queue_size_; // 任务队列最大容量
RejectPolicy reject_policy_; // 任务拒绝策略
std::vector<pthread_t> threads_; // 线程列表
std::queue<Task> task_queue_; // 任务队列
std::atomic<bool> is_running_; // 线程池是否运行
int current_threads_; // 当前线程数
int idle_threads_; // 空闲线程数
pthread_mutex_t mutex_; // 互斥锁
pthread_cond_t not_empty_; // 队列非空条件变量
pthread_cond_t not_full_; // 队列非满条件变量
};
// 测试任务:打印任务 ID 和线程 ID
void test_task(int id) {
std::cout << "Task " << id << " execute by thread: " << (unsigned long)pthread_self() << std::endl;
sleep(1); // 模拟任务耗时
}
int main() {
try {
// 创建线程池:核心 4 线程,最大 8 线程,队列容量 10,阻塞策略
ThreadPool pool(4, 8, 10, RejectPolicy::BLOCK);
// 提交 20 个任务
for (int i = 0; i < 20; ++i) {
pool.submit(std::bind(test_task, i));
}
// 主线程运行 10 秒后退出
sleep(10);
} catch (const std::exception& e) {
std::cerr << "ThreadPool error: " << e.what() << std::endl;
return 1;
}
return 0;
}
核心亮点(工业级):
std::function 封装任务,支持任意可调用对象(lambda、函数指针、成员函数)。pthread_attr_setaffinity_np 将线程绑定到指定 CPU 核心,避免 CPU 上下文切换;__attribute__((aligned(64))))避免多个线程的变量共享同一个 CPU 缓存行;std::atomic 实现轻量级计数器,替代互斥锁;epoll+timerfd 实现)。# 启动程序
gdb ./thread_pool
# 运行程序
run
# 查看所有线程
info threads
# 切换到指定线程(如线程 2)
thread 2
# 查看线程调用栈
bt
# 查看线程持有的锁
info mutex
# 设置断点
break ThreadPool::submit
# 继续运行
c
# 用 pstack 查看线程栈
pstack <pid>
# 用 valgrind 检测死锁和数据竞争
valgrind --tool=helgrind ./thread_pool
# 用 valgrind 检测内存泄漏
valgrind --tool=memcheck --leak-check=full ./thread_pool
# 用 perf 分析 CPU 占用
perf top -p <pid>
# 用 htop 查看线程 CPU 占用
htop -p <pid>

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online