跳到主要内容Linux 下 C++ 线程池实现与并发安全详解 | 极客日志C++
Linux 下 C++ 线程池实现与并发安全详解
文章介绍了 Linux 环境下 C++ 线程池的实现原理,涵盖线程池基本认识、核心代码逻辑(含任务队列、互斥锁、条件变量)、单例模式改造及线程安全性分析。同时阐述了死锁成因、必要条件及避免策略,结合重入性与线程安全的区别,提供了完整的工程化实践参考。
时间旅人3 浏览 线程池
线程池概述
线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能,而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程时的代价。线程池不仅可以保证内核的充分利用,还能防止过分调度。
应用场景:
- 需要大量的线程完成任务,且完成任务的时间比较短(如 WEB 服务器完成网页请求)。
- 对性能要求苛刻的应用,要求服务器迅速响应客户请求。
线程池实现
ThreadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <memory>
#include <time.h>
#include <unistd.h>
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"
#include "Logger.hpp"
#include "Task.hpp"
const int default_thread_num = 5;
template <typename T> class ThreadPool {
private:
{ _q.(); }
{
() {
T t;
{
;
(() && _is_running) {
_wait_thread_num++;
_cond.(_lock);
_wait_thread_num--;
}
(!_is_running && ()) {
(LoggerLevel::INFO) << << name << ;
;
}
t = _q.();
_q.();
}
();
(LoggerLevel::DEBUG) << name << << t.();
}
}
:
( thread_num = default_thread_num)
: _thread_num(thread_num), _is_running(), _wait_thread_num() {
( i = ; i < _thread_num; ++i) {
std::string name = + std::(i + );
_threads.([]( std::string &_name) {
->(_name);
}, name);
}
(LoggerLevel::INFO) << ;
}
{
(_is_running) ;
_is_running = ;
( &t : _threads) t.();
}
{
(!_is_running) ;
_is_running = ;
(_wait_thread_num > ) _cond.();
(LoggerLevel::INFO) << ;
}
{
( &t : _threads) t.();
(LoggerLevel::INFO) << ;
}
{
(!_is_running) ;
{
;
_q.(t);
(_wait_thread_num > ) _cond.();
}
}
~() {}
:
std::queue<T> _q;
std::vector<Thread> _threads;
_thread_num;
Mutex _lock;
Cond _cond;
_is_running;
_wait_thread_num;
};
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
- JSON美化和格式化
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online
bool QueueIsEmpty()
return
empty
void Routine(const std::string &name)
while
true
LockGuard lockguard(&_lock)
while
QueueIsEmpty
Wait
if
QueueIsEmpty
LOG
"线程池退出 && 任务队列为空,"
"退出"
break
front
pop
t
LOG
"handler task, "
ResultToString
public
ThreadPool
int
false
0
for
int
0
"thread-"
to_string
1
emplace_back
this
const
this
Routine
LOG
"ThreadPool obj create success"
void Start()
if
return
true
for
auto
Start
void Stop()
if
return
false
if
0
NotifyAll
LOG
"thread pool stop success"
void Wait()
for
auto
Join
LOG
"thread pool wait success"
void EnQueue(const T &t)
if
return
LockGuard lockguard(&_lock)
push
if
0
NotifyOne
ThreadPool
private
int
bool
int
Task.hpp
#pragma once
#include <iostream>
#include <sstream>
#include <functional>
#include <unistd.h>
class Task {
public:
Task() {}
Task(int x, int y) : a(x), b(y) {}
void Excute() { result = a + b; }
void operator()() { Excute(); }
std::string ResultToString() {
std::stringstream ss;
ss << a << " + " << b << " = " << result;
return ss.str();
}
private:
int a;
int b;
int result;
};
Mutex.hpp
#pragma once
#include <iostream>
#include <mutex>
#include <pthread.h>
class Mutex {
public:
Mutex() { pthread_mutex_init(&_lock, nullptr); }
void Lock() { pthread_mutex_lock(&_lock); }
pthread_mutex_t* Get() { return &_lock; }
void Unlock() { pthread_mutex_unlock(&_lock); }
~Mutex() { pthread_mutex_destroy(&_lock); }
private:
pthread_mutex_t _lock;
};
class LockGuard {
public:
LockGuard(Mutex* _mutex) : _mutexp(_mutex) { _mutexp->Lock(); }
~LockGuard() { _mutexp->Unlock(); }
private:
Mutex* _mutexp;
};
Cond.hpp
#include "Mutex.hpp"
class Cond {
public:
Cond() { pthread_cond_init(&_cond, nullptr); }
void Wait(Mutex& lock) { pthread_cond_wait(&_cond, lock.Get()); }
void NotifyOne() { pthread_cond_signal(&_cond); }
void NotifyAll() { pthread_cond_broadcast(&_cond); }
~Cond() { pthread_cond_destroy(&_cond); }
private:
pthread_cond_t _cond;
};
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <vector>
#include <string>
#include <functional>
#include <pthread.h>
#include <unistd.h>
#include <sys/syscall.h>
#include "Logger.hpp"
#define get_lwp_id() syscall(SYS_gettid)
using func_t = std::function<void(const std::string& name)>;
std::string thread_name_default = "None_Name";
class Thread {
public:
Thread(func_t func, std::string name = thread_name_default)
: _isrunning(false), _name(name), _func(func) {}
static void* start_routine(void* args) {
Thread* self = static_cast<Thread*>(args);
self->_isrunning = true;
self->_lwpid = get_lwp_id();
self->_func(self->_name);
pthread_exit((void*)0);
}
void Start() {
int n = pthread_create(&_tid, nullptr, start_routine, this);
if (n == 0) LOG(LoggerLevel::INFO) << "pthread_create success";
}
void Stop() {
int n = pthread_cancel(_tid);
LOG(LoggerLevel::INFO) << "thread cancel success";
(void)n;
}
void Join() {
if (!_isrunning) return;
int n = pthread_join(_tid, nullptr);
if (n == 0) LOG(LoggerLevel::INFO) << "pthread_join success";
}
~Thread() {}
private:
bool _isrunning;
pthread_t _tid;
pid_t _lwpid;
std::string _name;
func_t _func;
};
#endif
Logger.hpp
#pragma once
#include <iostream>
#include <filesystem>
#include <fstream>
#include <string>
#include <sstream>
#include <memory>
#include <unistd.h>
#include "Mutex.hpp"
enum class LoggerLevel { DEBUG, INFO, WARNING, ERROR, FATAL };
std::string LoggerLevelToString(LoggerLevel level) {
switch (level) {
case LoggerLevel::DEBUG: return "Debug";
case LoggerLevel::INFO: return "Info";
case LoggerLevel::WARNING: return "Warning";
case LoggerLevel::ERROR: return "Error";
case LoggerLevel::FATAL: return "Fatal";
default: return "Unknown";
}
}
std::string GetCurrentTime() {
time_t timep = time(nullptr);
struct tm currtm;
localtime_r(&timep, &currtm);
char buffer[64];
snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d-%02d-%02d",
currtm.tm_year + 1900, currtm.tm_mon + 1, currtm.tm_mday,
currtm.tm_hour, currtm.tm_min, currtm.tm_sec);
return buffer;
}
class LogStrategy {
public:
virtual ~LogStrategy() = default;
virtual void SyncLog(const std::string &logmessage) = 0;
};
class ConsoleLogStrategy : public LogStrategy {
public:
~ConsoleLogStrategy() {}
virtual void SyncLog(const std::string &logmessage) override {
LockGuard lockguard(&_lock);
std::cout << logmessage << std::endl;
}
private:
Mutex _lock;
};
const std::string default_dir_path_name = "log";
const std::string default_filename = "test.log";
class FileLogStrategy : public LogStrategy {
public:
FileLogStrategy(const std::string dir_path_name = default_dir_path_name,
const std::string filename = default_filename)
: _dir_path_name(dir_path_name), _filename(filename) {
if (std::filesystem::exists(_dir_path_name)) return;
try {
std::filesystem::create_directories(_dir_path_name);
} catch (const std::filesystem::filesystem_error &e) {
std::cerr << e.what() << "\r\n";
}
}
~FileLogStrategy() {}
virtual void SyncLog(const std::string &logmessage) override {
LockGuard lock(&_lock);
std::string target = _dir_path_name;
target += '/';
target += _filename;
std::ofstream out(target.c_str(), std::ios::app);
if (!out.is_open()) return;
out << logmessage << "\n";
out.close();
}
private:
std::string _dir_path_name;
std::string _filename;
Mutex _lock;
};
class Logger {
public:
Logger() {}
void EnableConsoleStrategy() { _strategy = std::make_unique<ConsoleLogStrategy>(); }
void EnableFileStrategy() { _strategy = std::make_unique<FileLogStrategy>(); }
class LogMessage {
public:
LogMessage(LoggerLevel level, std::string filename, int line, Logger& logger)
: _curr_time(GetCurrentTime()), _level(level), _pid(getpid()),
_filename(filename), _line(line), _logger(logger) {
std::stringstream ss;
ss << "[" << _curr_time << "] [" << LoggerLevelToString(_level) << "] "
<< "[" << _pid << "] [" << _filename << "] [" << _line << "] - ";
_loginfo = ss.str();
}
template <typename T>
LogMessage &operator<<(const T &info) {
std::stringstream ss;
ss << info;
_loginfo += ss.str();
return *this;
}
~LogMessage() {
if (_logger._strategy) _logger._strategy->SyncLog(_loginfo);
}
private:
std::string _curr_time;
LoggerLevel _level;
pid_t _pid;
std::string _filename;
int _line;
std::string _loginfo;
Logger &_logger;
};
LogMessage operator()(LoggerLevel level, std::string filename, int line) {
return LogMessage(level, filename, line, *this);
}
~Logger() {}
private:
std::unique_ptr<LogStrategy> _strategy;
};
Logger logger;
#define LOG(level) logger(level, __FILE__, __LINE__)
#define EnableConsoleStrategy() logger.EnableConsoleStrategy()
#define EnableFileStrategy() logger.EnableFileStrategy()
main.cc
#include "ThreadPool.hpp"
int main() {
srand((unsigned int)time(nullptr));
EnableConsoleStrategy();
std::unique_ptr<ThreadPool<Task>> tq = std::make_unique<ThreadPool<Task>>(10);
tq->Start();
int cnt = 10;
while (cnt--) {
int x = rand() % 10 + 1;
int y = rand() % 5 + 1;
Task t(x, y);
tq->EnQueue(t);
sleep(1);
}
tq->Stop();
tq->Wait();
return 0;
}
Makefile
threadpool:main.cc
g++ -o $@ $^ -std=c++17 -lpthread
.PHONY:clean
clean:
rm -f threadpool
单例模式
什么是单例模式?
某些类只能具有一个对象(实例),就称之为单例。在服务器开发场景中,经常需要让服务器加载大量数据到内存中,此时往往需要一个单例的类来管理这些数据。
- 饿汉方式:静态成员被编译在全局区,程序编译时已有虚拟地址。
template <typename T> class Singleton {
static T data;
public:
static T* GetInstance() { return &data; }
};
- 懒汉方式:静态指针在编译时有地址,调用
GetInstance 时才动态开辟空间。
template <typename T> class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) inst = new T();
return inst;
}
};
线程安全的单例改造:
上述懒汉模式在多线程调用 GetInstance 时可能创建多份对象,需加锁保证安全。
线程安全和重入问题
线程安全:多个线程访问共享资源时能正确执行,互不干扰。
重入:同一函数被不同执行流调用,前一个流程未结束,另一个再次进入。
- 不保护共享变量的函数
- 状态随调用变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
- 调用 malloc/free(底层用全局链表管理堆)
- 调用标准 I/O 库函数
- 函数体内使用静态数据结构
- 可重入函数一定是线程安全的。
- 线程安全不一定是可重入的(例如信号处理中的死锁风险)。
常见锁概念
死锁
定义:一组进程中各进程均占有不会释放的资源,因互相申请被其它进程所占用不会释放的资源而处于永久等待状态。
- 互斥条件:一个资源每次只能被一个执行流使用。
- 请求与保持条件:一个执行流因请求资源而被阻塞时,对已获得的资源保持不放。
- 不剥夺条件:已获得资源在未使用完之前,不能被强行剥夺。
- 循环等待条件:若干执行流之间形成头尾相接的循环等待关系。
避免死锁:
破坏上述任一条件即可,例如:资源一次性分配、使用超时机制、加锁顺序一致、避免锁未释放场景。