一。认识相关网络接口
1. socket 套接字
socket:
套接字是网络编程的基础,它就类似于网络通信的传递数据的管道,我们可以通过不同的参数来塑造这个管道。
Linux 环境下基于 UDP 的网络编程基础,涵盖 socket 套接字、地址结构体及常用网络接口函数。文章通过 Echo Server、Dict Server 和 Chat Server 三个实例,演示了如何使用 C++ 封装日志、锁、线程池等组件实现 UDP 通信。内容包含服务端与客户端的完整代码实现,涉及字节序转换、多线程并发处理及群聊逻辑路由,适合学习 Linux 网络编程基础。

socket:
套接字是网络编程的基础,它就类似于网络通信的传递数据的管道,我们可以通过不同的参数来塑造这个管道。
参数:
**domain:**指定套接字使用的地址族/协议族,套接字使用的类型决定了数据传输的网络协议层,常见的地址族/协议层
{
**AF_INET:**IPv4 地址族,使用32位IP地址
**AF_INET6:**IPv6 地址族,使用128位IP地址
**AF_UNIX/AF_LOCAL:**本地进程间通信,使用文件系统路径作为地址
}
**type:**指定套接字类型,决定传输层通信特性
{
**SOCK_DGRAM:**数据报套接字(基于UDP协议)
**SOCK_STREAM:**流式套接字(基于TCP协议)
}
**protocol:**指定具体传输层协议,我们通常使用默认,设置为0即可
返回值:
成功返回一个大于等于0的数,失败返回-1
sockaddr_in:
sockaddr_in 用于表示 IPv4 地址结构的核心结构体,主要用于存储和传递 IPv4 协议的地址信息。
参数:
**sin_family:**指定结构体存储的地址类型,AF_INET 固定为 IPv4 地址结构
**sin_port:**指定结构体存储的通信端口号
**sin_addr:**指定结构体存储的地址
**s_addr:**将地址转化为网络字节序大端进行存储
bind:
bind 的作用是将套接字与特定的 IP 和 port 进行绑定,我们已经将 IP 和端口号写入结构体 struct sockaddr_in 中,我们只需要传递结构体即可
参数:
**sockfd:**传递的套接字socket
**addr:**指向存储 IP 和 port 的结构体指针
**addrlen:**地址结构体的大小
返回值:
成功返回0,失败返回-1
recvfrom:
主要用于UDP协议接收网络数据报内容,同时获取发送方的地址信息
参数:
**sockfd:**套接字描述符
**buf:**接收数据的数组指针
**len:**数组的大小
**flags:**接收方式标志,通常为0,
**src_addr:**输出参数,指向地址结构体,用于储存发送方的IP和port
**addrlen:**结构体大小
返回值:
成功返回接收到的字节数,失败返回-1
sendto:
主要用于UDP协议发送网络数据报内容,同时发送发送方的IP和port
参数:
sockfd:套接字描述符
buf:储存发送信息的数组指针
len:数组的大小
flags:发送方式标志,默认为0
dest_addr:指向结构体存储的接收方的IP和端口号
addrlen:结构体的大小
返回值:
成功返回实际字节数,失败返回-1
ntohs / ntohl:
ntohs 和 ntohl 都是将网络字节序转换为主机字节序,ntohs 将 16 位网络字节序转换为 16 位的主机字节序(Network to Host Short),ntohl 将 32 位网络字节序转换为 32 位的主机字节序(Network to Host Long)
参数:
**netshort / netlong:**需要转换的端口号
htons / htonl:
htons 和 htonl 都是将主机字节序转换为网络字节序,htons 将 16 位主机字节序转换为 16 位的网络字节序(Host to Network Short),htonl 将 32 位主机字节序转换为 32 位的网络字节序(Host to Network Long)
参数:
**hostshort / hostlong:**需要转换的端口号
inet_ntoa / inet_ntop:
将 32位网络字节序的 IPv4 地址转换为点分十进制字符串,inet_ntoa 仅支持 IPv4 在多线程中容易出问题,现代编程更加推荐 inet_ntop
参数:
**af:**地址族
**src:**指向 IP 地址的指针
**dst:**指向数组的指针用于储存转换后的字符串
**size:**数组大小
主要实现简单的回显服务端和客户端发送的信息
我们对锁接口进行封装
#pragma once
#include <iostream>
#include <pthread.h>
namespace MutexModule {
class Mutex {
public:
Mutex() { pthread_mutex_init(&_mutex, nullptr); }
void Lock() { int n = pthread_mutex_lock(&_mutex); (void)n; }
void Unlock() { int n = pthread_mutex_unlock(&_mutex); (void)n; }
~Mutex() { pthread_mutex_destroy(&_mutex); }
pthread_mutex_t *Get() { return &_mutex; }
private:
pthread_mutex_t _mutex;
};
class LockGuard {
public:
LockGuard(Mutex &mutex):_mutex(mutex) { _mutex.Lock(); }
~LockGuard() { _mutex.Unlock(); }
private:
Mutex &_mutex;
};
}
制作简易的日志用于后续打印日志报告
#ifndef __LOG_HPP__
#define __LOG_HPP__
#include <iostream>
#include <cstdio>
#include <string>
#include <filesystem> //C++17
#include <sstream>
#include <fstream>
#include <memory>
#include <ctime>
#include <unistd.h>
#include "Mutex.hpp"
namespace LogModule {
using namespace MutexModule;
const std::string gsep = "\r\n";
// 策略模式,C++多态特性
// 2. 刷新策略 a: 显示器打印 b:向指定的文件写入
// 刷新策略基类
class LogStrategy {
public:
~LogStrategy() = default;
virtual void SyncLog(const std::string &message) = 0;
};
// 显示器打印日志的策略 : 子类
class ConsoleLogStrategy : public LogStrategy {
public:
ConsoleLogStrategy() { }
void SyncLog(const std::string &message) override { LockGuard lockguard(_mutex); std::cout << message << gsep; }
~ConsoleLogStrategy() { }
private:
Mutex _mutex;
};
// 文件打印日志的策略 : 子类
const std::string defaultpath = "./log";
const std::string defaultfile = "my.log";
class FileLogStrategy : public LogStrategy {
public:
FileLogStrategy(const std::string &path = defaultpath, const std::string &file = defaultfile) : _path(path), _file(file) { LockGuard lockguard(_mutex); if (std::filesystem::exists(_path)) { return; } try { std::filesystem::create_directories(_path); } catch (const std::filesystem::filesystem_error &e) { std::cerr << e.what() << '\n'; } }
void SyncLog(const std::string &message) override { LockGuard lockguard(_mutex); std::string filename = _path + (_path.back() == '/' ? "" : "/") + _file; // "./log/" + "my.log" std::ofstream out(filename, std::ios::app); // 追加写入的 方式打开 if (!out.is_open()) { return; } out << message << gsep; out.close(); }
~FileLogStrategy() { }
private:
std::string _path; // 日志文件所在路径
std::string _file; // 日志文件本身
Mutex _mutex;
};
// 形成一条完整的日志&&根据上面的策略,选择不同的刷新方式
// 1. 形成日志等级
enum class LogLevel { DEBUG, INFO, WARNING, ERROR, FATAL };
std::string Level2Str(LogLevel level) {
switch (level) {
case LogLevel::DEBUG: return "DEBUG";
case LogLevel::INFO: return "INFO";
case LogLevel::WARNING: return "WARNING";
case LogLevel::ERROR: return "ERROR";
case LogLevel::FATAL: return "FATAL";
default: return "UNKNOWN";
}
}
std::string GetTimeStamp() {
time_t curr = time(nullptr);
struct tm curr_tm;
localtime_r(&curr, &curr_tm);
char timebuffer[128];
snprintf(timebuffer, sizeof(timebuffer),"%4d-%02d-%02d %02d:%02d:%02d", curr_tm.tm_year+1900, curr_tm.tm_mon+1, curr_tm.tm_mday, curr_tm.tm_hour, curr_tm.tm_min, curr_tm.tm_sec );
return timebuffer;
}
// 1. 形成日志 && 2. 根据不同的策略,完成刷新
class Logger {
public:
Logger() { EnableConsoleLogStrategy(); }
void EnableFileLogStrategy() { _fflush_strategy = std::make_unique<FileLogStrategy>(); }
void EnableConsoleLogStrategy() { _fflush_strategy = std::make_unique<ConsoleLogStrategy>(); }
// 表示的是未来的一条日志
class LogMessage {
public:
LogMessage(LogLevel &level, std::string &src_name, int line_number, Logger &logger) : _curr_time(GetTimeStamp()), _level(level), _pid(getpid()), _src_name(src_name), _line_number(line_number), _logger(logger) { // 日志的左边部分,合并起来 std::stringstream ss; ss << "[" << _curr_time << "] [" << Level2Str(_level) << "] [" << _pid << "] [" << _src_name << "] [" << _line_number << "] - "; _loginfo = ss.str(); }
// LogMessage() << "hell world" << "XXXX" << 3.14 << 1234
template <typename T> LogMessage &operator<<(const T &info) { // a = b = c =d; // 日志的右半部分,可变的 std::stringstream ss; ss << info; _loginfo += ss.str(); return *this; }
~LogMessage() { if (_logger._fflush_strategy) { _logger._fflush_strategy->SyncLog(_loginfo); } }
private:
std::string _curr_time;
LogLevel _level;
pid_t _pid;
std::string _src_name;
int _line_number;
std::string _loginfo; // 合并之后,一条完整的信息
Logger &_logger;
};
// 这里故意写成返回临时对象
LogMessage operator()(LogLevel level, std::string name, int line) { return LogMessage(level, name, line, *this); }
~Logger() { }
private:
std::unique_ptr<LogStrategy> _fflush_strategy;
};
// 全局日志对象
Logger logger;
// 使用宏,简化用户操作,获取文件名和行号
#define LOG(level) logger(level, __FILE__, __LINE__)
#define Enable_Console_Log_Strategy() logger.EnableConsoleLogStrategy()
#define Enable_File_Log_Strategy() logger.EnableFileLogStrategy()
}
#endif
服务端封装划分为服务端初始化和服务端启动,初始化阶段需要进行套接字创建,地址结构体初始化,再将套接字与结构体进行绑定。启动阶段创建缓冲区,不断的接受客户端传递的数据,在服务端打印后,再将数据传回给客户端。
下面是服务端代码
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <functional>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Log.hpp"
using namespace std;
using namespace LogModule;
const int defaultnum = -1;
using func_t = function<string(const string &)>;
class UdpServer {
public:
UdpServer(uint16_t port, func_t func) : _isrunning(false), _port(port), _func(func), _socket(defaultnum) { }
void Init() { _socket = socket(AF_INET, SOCK_DGRAM, 0); // IPv4协议 UDP模式 默认值 if (_socket < 0) { LOG(LogLevel::FATAL) << "socket error"; exit(1); } LOG(LogLevel::INFO) << "socket success" << _socket; // 标识符为3 012被占用 struct sockaddr_in local; // IPv4 网络地址结构体 bzero(&local, sizeof(local)); // 清空结构体 local.sin_family = AF_INET; // A=Address, F=Family, IN=Internet),表示使用 IPv4 协议; local.sin_port = htons(_port); // hton 大端序 shost = 主机,network = 网络,short=16 位整数 local.sin_addr.s_addr = INADDR_ANY; // 统定义的宏(值为 0x00000000,对应 IPv4 地址 0.0.0.0) // 这里为什么服务端要 bind!!!! // 服务端需要稳定固定的地址,但客户端用临时地址即可 int n = bind(_socket, (struct sockaddr *)&local, sizeof(local)); if (n < 0) { LOG(LogLevel::FATAL) << "bind error"; exit(2); } LOG(LogLevel::INFO) << "bind success,socket:" << socket; }
void Start() { _isrunning = true; while (_isrunning) { char buffer[1024]; // 接收客户端数据 struct sockaddr_in peer; // IPv4 数据接收结构体 socklen_t len = sizeof(peer); // 接收完客户端信息,在进行发送 ssize_t s = recvfrom(_socket, &buffer, sizeof(buffer) - 1, 0, (sockaddr *)&peer, &len);//接收客户端信息,buffer数组最后一位要0 if (s > 0) { int peer_port = ntohs(peer.sin_port); // string peer_ip = inet_ntoa(peer.sin_addr); //将 IP转换为string类格式 buffer[s] = 0; string result = _func(buffer); //将信息执行函数 cout<<result<<endl; sendto(_socket,&buffer,result.size(),0,(sockaddr*)&peer,len); //发送回给客户端 } } }
~UdpServer() { }
private:
bool _isrunning;
int _socket;
uint16_t _port;
func_t _func;
};
进行基础信息的创建调用函数即可
#include <iostream>
#include <memory>
#include "UdpServer.hpp"
string fun_c(const string&kk) {
string a = "hello,";
a += kk;
return a;
}
// 输入 ./UdpServer port
int main(int argc, char *argv[]) {
if (argc != 2) {
cerr << "Usages:" << argv[0] << " port" << endl;
return 1;
}
uint16_t _port = stoi(argv[1]);
Enable_Console_Log_Strategy();
unique_ptr<UdpServer> Udp = make_unique<UdpServer>(_port, fun_c);
Udp->Init();
Udp->Start();
return 0;
}
客户端首先创建自己的地址结构体,进行地址 IP 和 port 初始化,接着创建字符串进行输入,发送给服务端,接着创建缓冲区接收服务端发送回来的信息。
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <functional>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
using namespace std;
// 客户端
// 输入 ./UdpClient ip port
int main(int argc, char *argv[]) {
// 先通过输入拿到 ip 和 port
if (argc != 3) {
cerr << "Usage:" << argv[0] << " Client_ip Client_port" << endl;
return 1;
}
uint16_t _port = stoi(argv[2]);
string _ip = argv[1];
// 选定网络协议
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
cerr << "socket error" << endl;
return 2;
}
cout << "Client socket success" << endl;
// 初始化发送信息的结构体信息
struct sockaddr_in Client;
bzero(&Client, sizeof(Client));
Client.sin_family = AF_INET;
Client.sin_port = htons(_port);
Client.sin_addr.s_addr = inet_addr(_ip.c_str());
while (true) {
// 输入信息发送结构体
string message;
cout << "Please Enter:";
getline(cin, message);
sendto(sockfd, message.c_str(), message.size(), 0, (sockaddr *)&Client, sizeof(Client));
// 接收返回信息
char buffer[1024];
struct sockaddr_in peer;
socklen_t len;
int n = recvfrom(sockfd, &buffer, sizeof(buffer), 0, (sockaddr *)&peer, &len);
if (n > 0) {
buffer[n] = 0;
cout << "recvfrom success:" << buffer << endl;
}
}
return 0;
}
这里的日志和锁封装就不再添加,详细参考上面的代码。
我们这里来看看地址结构的封装
地址结构体封装接收 IP 和 port 并进行32位网络字节序转换以及端口号网络字节序转换为主机字节序
#pragma once
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
using namespace std;
class InetAddr {
public:
InetAddr(struct sockaddr_in addr) : _addr(addr) { _port = ntohs(addr.sin_port); _ip = inet_ntoa(addr.sin_addr); }
uint16_t Port() { return _port; }
string Ip() { return _ip; }
~InetAddr() { }
private:
uint16_t _port;
string _ip;
struct sockaddr_in _addr;
};
字典主要实现两个功能,一个是'下载字典',将字典的内容打印到屏幕上,第二个功能是进行翻译,输入中文返回英文。
首先我们要对字典文本进行解析,以:为分隔,对左右两边的字符串进行截取并放入到字典数据结构中。翻译功能找到对应的单词打印即可
#pragma once
#include <istream>
#include <iostream>
#include <unordered_map>
#include <string>
#include "Log.hpp"
#include "InetAddr.hpp"
using namespace std;
using namespace LogModule;
const string defpath = "./dictionary.txt";
const string sep = ":";
class Dict {
public:
Dict(const string &path = defpath) : _dict_path(path) { }
bool LoadDict() {
ifstream in(_dict_path);
if (!in.is_open()) {
LOG(LogLevel::DEBUG) << "打开字典:" << _dict_path << " 失败";
return false;
}
LOG(LogLevel::INFO) << "打开字典:" << _dict_path << " 成功";
string line;
while (getline(in, line)) {
auto pos = line.find(sep);
if (pos == string::npos) {
LOG(LogLevel::WARNING) << "字典格式错误";
continue;
}
auto english = line.substr(0, pos);
auto chinese = line.substr(pos + sep.size());
if (chinese.empty() || english.empty()) {
LOG(LogLevel::WARNING) << "字典识别错误:" << line;
continue;
}
_dict.insert(make_pair(english, chinese));
LOG(LogLevel::INFO) << "加载" << line;
}
in.close();
return true;
}
string translate(const string &word, InetAddr &client) {
auto iter = _dict.find(word);
if (iter == _dict.end()) {
LOG(LogLevel::DEBUG) << "进入翻译 [" << client.Ip() << "," << client.Port() << "]#" << word << "->None";
return "unknown";
}
LOG(LogLevel::INFO) << "进入翻译 [" << client.Ip() << "," << client.Port() << "]#" << word << "->" << iter->second;
return iter->first + "->" + iter->second;
}
~Dict() { }
private:
string _dict_path;
unordered_map<string, string> _dict;
};
服务端执行逻辑和上面的服务端逻辑一致,只是将端口和IP封装成了一个类进行处理
#pragma once
#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <functional>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string>
#include "Log.hpp"
#include "InetAddr.hpp"
using namespace std;
using namespace LogModule;
using func_t = function<string(const string &, InetAddr &)>;
const int defaultnum = -1;
class UdpServer {
public:
UdpServer(uint16_t port, func_t func) : _port(port), _isrunning(false), _func(func), _sockfd(defaultnum) { }
void Init() {
_sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (_sockfd < 0) {
LOG(LogLevel::FATAL) << "fail to socket";
exit(1);
}
LOG(LogLevel::INFO) << "sucess to socket,socket: " << _sockfd;
struct sockaddr_in local;
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;
int m = bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
if (m < 0) {
LOG(LogLevel::FATAL) << "bind error";
exit(2);
}
LOG(LogLevel::INFO) << "bind success, socket:" << _sockfd;
}
void Start() {
_isrunning = true;
while (_isrunning) {
char buffer[1024];
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
ssize_t s = recvfrom(_sockfd, &buffer, sizeof(buffer), 0, (struct sockaddr *)&peer, &len);
if (s > 0) {
InetAddr client(peer);
buffer[s] = 0;
string result = _func(buffer, client);
sendto(_sockfd, result.c_str(), result.size(), 0, (struct sockaddr *)&peer, len);
}
}
}
~UdpServer() { }
private:
uint16_t _port;
bool _isrunning;
func_t _func;
int _sockfd;
};
#include <iostream>
#include <string>
#include <cstring>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
using namespace std;
// ./udpclient server_ip server_port
int main(int argc, char *argv[]) {
if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " server_ip server_port" << std::endl;
return 1;
}
std::string server_ip = argv[1];
uint16_t server_port = std::stoi(argv[2]);
// 1. 创建socket
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if(sockfd < 0) {
std::cerr << "socket error" << std::endl;
return 2;
}
// 2. 本地的ip和端口是什么?要不要和上面的'文件'关联呢?
// 问题:client要不要bind?需要bind.
// client要不要显式的bind?不要!!首次发送消息,OS会自动给client进行bind,OS知道IP,端口号采用随机端口号的方式
// 为什么?一个端口号,只能被一个进程bind,为了避免client端口冲突
// client端的端口号是几,不重要,只要是唯一的就行!
// 填写服务器信息
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(server_port);
server.sin_addr.s_addr = inet_addr(server_ip.c_str());
while(true) {
std::string input;
std::cout << "Please Enter# ";
std::getline(std::cin, input);
int n = sendto(sockfd, input.c_str(), input.size(), 0, (struct sockaddr*)&server, sizeof(server));
(void)n;
char buffer[1024];
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int m = recvfrom(sockfd, buffer, sizeof(buffer)-1, 0, (struct sockaddr*)&peer, &len);
if(m > 0) {
buffer[m] = 0;
std::cout << buffer << std::endl;
}
}
return 0;
}
服务端需要增加字典结构
#include <iostream>
#include <memory>
#include "Dict.hpp" // 翻译的功能
#include "UdpServer.hpp" // 网络通信的功能
// ./UdpServer port
int main(int argc, char *argv[]) {
if (argc != 2) {
cerr << "Usage: " << argv[0] << " port" << endl;
return 1;
}
uint16_t port = stoi(argv[1]);
Enable_Console_Log_Strategy();
Dict dict;
dict.LoadDict();
unique_ptr<UdpServer> udp = make_unique<UdpServer>(port, [&dict](const string &word, InetAddr addr) {
return dict.translate(word, addr);
});
udp->Init();
udp->Start();
return 0;
}
日志,锁,地址结构在上文已经完成,可对上文进行参考
#pragma once
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
using namespace MutexModule;
namespace CondModule {
class Cond {
public:
Cond() { pthread_cond_init(&_cond, nullptr); }
void Wait(Mutex &mutex) { int n = pthread_cond_wait(&_cond, mutex.Get()); (void)n; }
void Signal() { // 唤醒在条件变量下等待的一个线程 int n = pthread_cond_signal(&_cond); (void)n; }
void Broadcast() { // 唤醒所有在条件变量下等待的线程 int n = pthread_cond_broadcast(&_cond); (void)n; }
~Cond() { pthread_cond_destroy(&_cond); }
private:
pthread_cond_t _cond;
};
};
#ifndef _THREAD_H_
#define _THREAD_H_
#include <iostream>
#include <string>
#include <pthread.h>
#include <cstdio>
#include <cstring>
#include <functional>
namespace ThreadModlue {
static uint32_t number = 1; // bug
class Thread {
using func_t = std::function<void()>;
// 暂时这样写,完全够了
private:
void EnableDetach() { _isdetach = true; }
void EnableRunning() { _isrunning = true; }
static void *Routine(void *args) // 属于类内的成员函数,默认包含this指针!
{
Thread *self = static_cast<Thread *>(args);
self->EnableRunning();
if (self->_isdetach) self->Detach();
pthread_setname_np(self->_tid, self->_name.c_str());
self->_func(); // 回调处理
return nullptr;
}
// bug
public:
Thread(func_t func) : _tid(0), _isdetach(false), _isrunning(false), res(nullptr), _func(func) {
_name = "thread-" + std::to_string(number++);
}
void Detach() {
if (_isdetach) return;
if (_isrunning) pthread_detach(_tid);
EnableDetach();
}
bool Start() {
if (_isrunning) return false;
int n = pthread_create(&_tid, nullptr, Routine, this);
if (n != 0) {
return false;
} else {
return true;
}
}
bool Stop() {
if (_isrunning) {
int n = pthread_cancel(_tid);
if (n != 0) {
return false;
} else {
_isrunning = false;
return true;
}
}
return false;
}
void Join() {
if (_isdetach) {
return;
}
int n = pthread_join(_tid, &res);
if (n != 0) {
} else {
}
}
pthread_t Id() {
return _tid;
}
~Thread() { }
private:
pthread_t _tid;
std::string _name;
bool _isdetach;
bool _isrunning;
void *res;
func_t _func;
};
}
#endif
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"
// .hpp header only
namespace ThreadPoolModule {
using namespace ThreadModlue;
using namespace LogModule;
using namespace CondModule;
using namespace MutexModule;
static const int gnum = 5;
template <typename T>
class ThreadPool {
private:
void WakeUpAllThread() {
LockGuard lockguard(_mutex);
if (_sleepernum) _cond.Broadcast();
LOG(LogLevel::INFO) << "唤醒所有的休眠线程";
}
void WakeUpOne() {
_cond.Signal();
//LOG(LogLevel::INFO) << "唤醒一个休眠线程";
}
ThreadPool(int num = gnum) : _num(num), _isrunning(false), _sleepernum(0) {
for (int i = 0; i < num; i++) {
_threads.emplace_back( [this]() { HandlerTask(); });
}
}
void Start() {
if (_isrunning) return;
_isrunning = true;
for (auto &thread : _threads) {
thread.Start();
// LOG(LogLevel::INFO) << "start new thread success: " << thread.Name();
}
}
ThreadPool(const ThreadPool<T> &) = delete;
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
public:
static ThreadPool<T> *GetInstance() {
if (inc == nullptr) {
LockGuard lockguard(_lock);
LOG(LogLevel::DEBUG) << "获取单例....";
if (inc == nullptr) {
LOG(LogLevel::DEBUG) << "首次使用单例,创建之....";
inc = new ThreadPool<T>();
inc->Start();
}
}
return inc;
}
void Stop() {
if (!_isrunning) return;
_isrunning = false;
// 唤醒所有的线层
WakeUpAllThread();
}
void Join() {
for (auto &thread : _threads) {
thread.Join();
}
}
void HandlerTask() {
char name[128];
pthread_getname_np(pthread_self(), name, sizeof(name));
while (true) {
T t;
{
LockGuard lockguard(_mutex);
// 1. a.队列为空 b. 线程池没有退出
while (_taskq.empty() && _isrunning) {
_sleepernum++;
_cond.Wait(_mutex);
_sleepernum--;
}
// 2. 内部的线程被唤醒
if (!_isrunning && _taskq.empty()) {
LOG(LogLevel::INFO) << name << " 退出了,线程池退出&&任务队列为空";
break;
}
// 一定有任务
t = _taskq.front();
// 从 q 中获取任务,任务已经是线程私有的了!!!
_taskq.pop();
}
t(); // 处理任务,需/需要在临界区内部处理吗?1 0
}
}
bool Enqueue(const T &in) {
if (_isrunning) {
LockGuard lockguard(_mutex);
_taskq.push(in);
if (_threads.size() == _sleepernum) WakeUpOne();
return true;
}
return false;
}
~ThreadPool() { }
private:
std::vector<Thread> _threads;
int _num; // 线程池中,线程的个数
std::queue<T> _taskq;
Cond _cond;
Mutex _mutex;
bool _isrunning;
int _sleepernum; // bug??
static ThreadPool<T> *inc; // 单例指针
static Mutex _lock;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::inc = nullptr;
template <typename T>
Mutex ThreadPool<T>::_lock;
}
我们实现群聊的逻辑是服务端接收到客户端发来的信息,将信息添加上发送者的信息之后,再打包发送给每一个群聊用户,这样就实现了群聊。
原理类似,我们用 vector 容器存储地址结构体,从地址结构体当中提取出发送的信息,接着把信息发送给容器中每一个成员。同时为了解决线程并发冲突问题这里引入了锁
#pragma once
#include <iostream>
#include <vector>
#include "Log.hpp"
#include "InetAddr.hpp"
#include "Mutex.hpp"
using namespace std;
using namespace LogModule;
using namespace MutexModule;
class Route {
private:
bool IsExit(InetAddr &peer) {
for (auto &e : _online_user) {
if (e == peer) {
return true;
}
}
return false;
}
void User_Add(InetAddr &peer) {
LOG(LogLevel::INFO) << "添加一个聊天用户";
_online_user.push_back(peer);
}
void User_Delete(InetAddr &peer) {
//LOG(LogLevel::DEBUG) << "删除一个用户";
for (auto iter = _online_user.begin(); iter < _online_user.end(); iter++) {
if (*iter == peer) {
LOG(LogLevel::INFO) << "成功删除用户:" << peer.StringAddr();
_online_user.erase(iter);
break;
}
}
}
public:
Route() { }
void MessageRoute(int _sockfd, const string &messages, InetAddr &addr) {
LockGuard lockguard(_lock);
if (!IsExit(addr)) {
User_Add(addr);
}
string result_messeage = "[" + addr.StringAddr() + "]" + messages;
LOG(LogLevel::INFO)<<result_messeage;
for (auto &e : _online_user) {
sendto(_sockfd, result_messeage.c_str(), result_messeage.size(), 0, (const struct sockaddr *)&e.Addr(), sizeof(e.Addr()));
}
if (messages == "QUIT") {
LOG(LogLevel::INFO) << "删除一个用户" << addr.StringAddr();
User_Delete(addr);
}
}
~Route() { }
private:
vector<InetAddr> _online_user;
Mutex _lock;
};
服务端处与原先逻辑类似
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <strings.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Log.hpp"
#include "InetAddr.hpp"
using namespace LogModule;
using func_t = std::function<void(int sockfd, const std::string&, InetAddr&)>;
const int defaultfd = -1;
// 你是为了进行网络通信的!
class UdpServer {
public:
UdpServer(uint16_t port, func_t func) : _sockfd(defaultfd), // _ip(ip), _port(port), _isrunning(false), _func(func) { }
void Init() {
// 1. 创建套接字
_sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (_sockfd < 0) {
LOG(LogLevel::FATAL) << "socket error!";
exit(1);
}
LOG(LogLevel::INFO) << "socket success, sockfd : " << _sockfd;
// 2. 绑定socket信息,ip和端口,ip(比较特殊,后续解释)
// 2.1 填充sockaddr_in结构体
struct sockaddr_in local;
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
// 我会不会把我的IP地址和端口号发送给对方?
// IP信息和端口信息,一定要发送到网络!
// 本地格式->网络序列
local.sin_port = htons(_port);
// IP 也是如此,1. IP 转成 4 字节 2. 4 字节转成网络序列 -> in_addr_t inet_addr(const char *cp);
//local.sin_addr.s_addr = inet_addr(_ip.c_str());
// TODO
local.sin_addr.s_addr = INADDR_ANY;
// InetAddr addr("0", _port);
// addr.NetAddr();
// 那么为什么服务器端要显式的 bind 呢?IP 和端口必须是众所周知且不能轻易改变的!
int n = bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
if (n < 0) {
LOG(LogLevel::FATAL) << "bind error";
exit(2);
}
LOG(LogLevel::INFO) << "bind success, sockfd : " << _sockfd;
}
void Start() {
_isrunning = true;
while (_isrunning) {
char buffer[1024];
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 1. 收消息,client 为什么要个服务器发送消息啊?不就是让服务端处理数据。
ssize_t s = recvfrom(_sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&peer, &len);
if (s > 0) {
InetAddr client(peer);
buffer[s] = 0;
// TODO
_func(_sockfd, buffer, client);
// LOG(LogLevel::DEBUG) << "[" << peer_ip << ":" << peer_port<< "]# " << buffer;
// 1. 消息内容 2. 谁发的??
// 2. 发消息
// std::string echo_string = "server echo@ ";
// echo_string += buffer;
// sendto(_sockfd, result.c_str(), result.size(), 0, (struct sockaddr*)&peer, len);
}
}
}
~UdpServer() { }
private:
int _sockfd;
uint16_t _port;
// std::string _ip; // 用的是字符串风格,点分十进制,"192.168.1.1"
bool _isrunning;
func_t _func; // 服务器的回调函数,用来进行对数据进行处理
};
此处我们引入线程池,为的是解决信息的传递和接收无法并发的问题,我们将任务放到线程池当中,由线程池来分配线程来执行接收发送的工作。服务端的逻辑也很简单,就是将数据打包成任务,放到线程池中。
#include <iostream>
#include <memory>
#include "Route.hpp"
#include "ChatServer.hpp" // 网络通信的功能
#include "ThreadPool.hpp"
using namespace ThreadPoolModule;
using task_t = function<void()>;
// ./Chat_Server port
int main(int argc, char *argv[]) {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " port" << std::endl;
return 1;
}
// std::string ip = argv[1];
uint16_t port = std::stoi(argv[1]);
Enable_Console_Log_Strategy();
Route r;
auto tp = ThreadPool<task_t>::GetInstance();
// bind 绑定函数,第一个参数为调用方法,第二个参数是实体,剩下的参数是传递到第一个参数的参数
unique_ptr<UdpServer> udp = make_unique<UdpServer>(port,[&r,&tp] (int sockfd, const std::string& messages, InetAddr& addr){
task_t t = std::bind(&Route::MessageRoute,&r, sockfd, messages, addr);
tp->Enqueue(t);
});
udp->Init();
udp->Start();
return 0;
}
客户端的操作无疑是接收信息和发送信息,我们将接受信息发送信息打包成两个线程进行管理
#include <iostream>
#include <string>
#include <cstring>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "Thread.hpp"
int sockfd = 0;
std::string server_ip;
uint16_t server_port = 0;
pthread_t id;
using namespace ThreadModlue;
void Recv() {
while (true) {
char buffer[1024];
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int m = recvfrom(sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&peer, &len);
if (m > 0) {
buffer[m] = 0;
std::cerr << buffer << std::endl;
// 2
}
}
}
void Send() {
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(server_port);
server.sin_addr.s_addr = inet_addr(server_ip.c_str());
const std::string online = "inline";
sendto(sockfd, online.c_str(), online.size(), 0, (struct sockaddr *)&server, sizeof(server));
while (true) {
std::string input;
std::cout << "Please Enter# ";
// 1
std::getline(std::cin, input);
// 0
int n = sendto(sockfd, input.c_str(), input.size(), 0, (struct sockaddr *)&server, sizeof(server));
(void)n;
if (input == "QUIT") {
pthread_cancel(id);
break;
}
}
}
// ./Chat_Client ip port
int main(int argc, char *argv[]) {
if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " server_ip server_port" << std::endl;
return 1;
}
server_ip = argv[1];
server_port = std::stoi(argv[2]);
// 1. 创建socket
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
std::cerr << "socket error" << std::endl;
return 2;
}
Thread recv(Recv);
Thread send(Send);
recv.Start();
send.Start();
id = send.Id();
recv.Join();
send.Join();
return 0;
}

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online