哪个网站财经做的最好,网站做网页,ps怎么制作网页效果图,301的网站用什么来做目录
一、Reactor模式概述
二、日志模块#xff1a;Log.hpp
三、TCP连接模块#xff1a;Sock.hpp
四、非阻塞通信模块#xff1a;Util.hpp
五、多路复用I/O模块#xff1a;Epoller.hpp
六、协议定制模块#xff1a;Protocol.hpp
七、服务器模块#xff1a;Server.…目录
一、Reactor模式概述
二、日志模块Log.hpp
三、TCP连接模块Sock.hpp
四、非阻塞通信模块Util.hpp
五、多路复用I/O模块Epoller.hpp
六、协议定制模块Protocol.hpp
七、服务器模块Server.hpp server.cc
八、客户端模块Client.hpp client.cc 前情提示在学习Reactor模式之前需要熟悉socket套接字及TCP网络通信需要熟悉 select / poll / epoll 三种多路转接IO需要理解Linux文件系统的文件描述符与基础IO需要理解服务器server与客户端client的设计。
【Linux后端服务器开发】基础IO与文件系统_命运on-9的博客-CSDN博客
【Linux后端服务器开发】TCP通信设计_命运on-9的博客-CSDN博客
【Linux后端服务器开发】协议定制序列化与反序列化_命运on-9的博客-CSDN博客
【Linux后端服务器开发】select多路转接IO服务器_命运on-9的博客-CSDN博客
【Linux后端服务器开发】poll/epoll多路转接IO服务器_命运on-9的博客-CSDN博客 一、Reactor模式概述
Reactor是什么reactor的英文翻译是【反应堆】reactor设计模式是一种事件处理模式。
如何让一个server服务器连接多个client客户端并处理业务我们可以采用多进程 / 多线程的方法但是无论是进程还是线程对系统资源的消耗都是较大的于是我们可以采用 select / poll / epoll 的多路复用IO方法而在三种不同的多路复用方法中性能最优的是epoll。
epoll的LT模式和ET模式该如何选择LT和ET是不同的事件通知策略LT水平触发是阻塞式通知ET边缘触发是非阻塞式通知ET模式会倒逼程序员一次性将就绪的数据读写完毕这样也就使得一般情况下ET模式的效率更高所以在Reactor模式的设计中采用ET模式。
Reactor模式也叫Dispatcher(分派器)模式它的工作原理是什么呢 I/O多路复用监听事件当有事件就绪时根据事件类型分配给某个进程/线程。 Reactor模式主要由Reactor分派器和资源处理这两个部分组成
Reactor分派器负责监听和分发事件事件类型包含连接、读写、异常资源处理负责业务处理通常流程是 read - 业务逻辑 - send Reactor模式是灵活多变的根据不同的业务场景有不同的设计可以是【单Reactor】也可以是【多Reactor】可以是【单进程/线程】 也可以是【多进程/线程】不过此文中的Reactor网络计算器设计采用的是【单Reactor 单进程/线程】模式。 【单Reactor 单进程/线程】模式设计 初始化TcpServer创建listensock并将listensock添加进epoller模型中进行监听listensock会生成第一个Connection对象(注册_Accepter接口处理读取连接任务)之后的TcpClient的连接请求都是由这个绑定了_Accepter接口的listensock进行监听。
epoller模型Wait()等待TcpClient的请求如果是连接请求则TcpServer会派发任务给listensock对象让其调用Sock对象的Accept接口创建新的套接字sock进行IOsock会创建一个新的Connection对象(注册_Reader、_Sender、_Excepter接口处理读/写/异常任务)。
Connection进行业务处理的时候根据TCP通信协议的通信流程是 read - 业务逻辑 - send若是在通信中遇到异常则会调用_Excepter接口关闭连接。
在TCP通信设计中我们需要设计通信数据的序列化和反序列化这便是在Connection对象读取到数据之后的业务处理逻辑通过序列化和反序列化将数据发送给TcpClientTcpClient收到服务器发送数据后再通过序列化和反序列化拿到想要的数据。
在服务器设计的时候日志功能是可以省略的但是加上日志功能的服务器功能更完整并且方便调试和服务器维护。 二、日志模块Log.hpp
日志模块里面将日志分为DEBUG、NORMAL、WARNING、ERROR、FATAL五个记录等级并且定义了不同的错误类型日志记录需要记录进程的IP和端口号以及记录时间。
由于Linux系统的gdb调试是很复杂的通过在代码中添加DEBUG的打印日志更方便调试。
#pragma once#include iostream
#include string
#include cstdarg
#include ctime
#include cstdarg
#include unistd.h#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4#define NUM 1024enum
{USAGE_ERR 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR,EPOLL_CREATE_ERR
};const char* To_Level_Str(int level)
{switch (level){case DEBUG:return DEBUG;case NORMAL:return NORMAL;case WARNING:return WARNING;case ERROR:return ERROR;case FATAL:return FATAL;default:return nullptr;}
}std::string To_Time_Str(long int t)
{// 将时间戳转化为tm结构体struct tm* cur;cur gmtime(t);cur-tm_hour (cur-tm_hour 8) % 24; // 东八区char tmp[NUM];std::string my_format %Y-%m-%d %H:%M:%S;strftime(tmp, sizeof(tmp), my_format.c_str(), cur);std::string cur_time tmp;return cur_time;
}void Log_Message(int level, const char *format, ...)
{char logprefix[NUM];std::string cur_time To_Time_Str((long int)time(nullptr));snprintf(logprefix, sizeof(logprefix), [%s][%s][pid: %d],To_Level_Str(level), cur_time.c_str(), getpid());char logcontent[NUM];va_list arg;va_start(arg, format);vsnprintf(logcontent, sizeof(logcontent), format, arg);std::cout logprefix logcontent std::endl;
}三、TCP连接模块Sock.hpp
Sock对象里面将TCP网络连接的底层接口进行了封装更方便其他模块对于TCP连接的调用。
Sock对象不再对Accept()的连接失败做处理将处理权交给了TcpServer。
#pragma once#include iostream
#include string
#include cstring
#include unistd.h#include sys/types.h
#include sys/socket.h
#include netinet/in.h
#include arpa/inet.h#include Log.hppconst static int g_defaultfd -1;
const static int g_backlog 32;class Sock
{
public:Sock(): _listensock(g_defaultfd){}Sock(int listensock): _listensock(listensock){}int Fd(){return _listensock;}void Socket(){// 1. 创建socket文件套接字对象_listensock socket(AF_INET, SOCK_STREAM, 0);if (_listensock 0){Log_Message(FATAL, create socket error);exit(SOCKET_ERR);}Log_Message(NORMAL, create socket success: %d, _listensock);int opt 1;setsockopt(_listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, opt, sizeof(opt));}void Bind(int port){// 2. bind绑定自己的网络信息struct sockaddr_in local;memset(local, 0, sizeof(local));local.sin_family AF_INET;local.sin_port htons(port);local.sin_addr.s_addr INADDR_ANY;if (bind(_listensock, (struct sockaddr *)local, sizeof(local)) 0){Log_Message(FATAL, bind socket error);exit(BIND_ERR);}Log_Message(NORMAL, bind socket success);}void Listen(){// 3. 设置socket 为监听状态if (listen(_listensock, g_backlog) 0) // 第二个参数backlog后面在填这个坑{Log_Message(FATAL, listen socket error);exit(LISTEN_ERR);}Log_Message(NORMAL, listen socket success);}int Accept(std::string *clientip, uint16_t *clientport, int* err){struct sockaddr_in peer;socklen_t len sizeof(peer);int sock accept(_listensock, (struct sockaddr *)peer, len);*err errno;if (sock 0){*clientip inet_ntoa(peer.sin_addr);*clientport ntohs(peer.sin_port);}return sock;}void Close(){if (_listensock ! g_defaultfd)close(_listensock);}~Sock(){this-Close();}private:int _listensock;
}; 四、非阻塞通信模块Util.hpp
Util.hpp设置静态成员函数Set_Noblock()将ET通知策略的套接字sock设置为非阻塞模式。
本次网络计算器的设计是ET模式故所有的连接在新建连接时都需要设置为非阻塞模式。
#pragma once#include iostream
#include fcntl.h
#include unistd.hclass Util
{
public:static bool Set_Nonblock(int fd){int fl fcntl(fd, F_GETFL);if (fl 0)return false;fcntl(fd, F_SETFL, fl | O_NONBLOCK);return true;}
}; 五、多路复用I/O模块Epoller.hpp
epoll模型是一个操作系统层面的模型我们将控制epoll的系统接口封装在Epoller对象中方便TcpServer的调用。
无论是TcpClient的连接请求还是业务请求从epoll的角度来看都是一个对文件描述符的读事件epoll只需要做事件通知即可。
#pragma once#include iostream
#include string
#include cstring
#include sys/epoll.h
#include unistd.h#include Log.hppconst static int g_default_epfd -1;
const static int g_size 64;class Epoller
{
public:Epoller(): _epfd(g_default_epfd){}void Create(){_epfd epoll_create(g_size);if (_epfd 0){Log_Message(FATAL, epoll create error: %s, strerror(errno));exit(EPOLL_CREATE_ERR);}}// user - kernelbool Add_Event(int sock, uint32_t events){struct epoll_event ev;ev.events events;ev.data.fd sock;int n epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, ev);return n 0;}// kernel - userint Wait(struct epoll_event revs[], int num, int timeout){return epoll_wait(_epfd, revs, num, timeout);}bool Control(int sock, uint32_t event, int action){int n 0;if (action EPOLL_CTL_MOD){struct epoll_event ev;ev.events event;ev.data.fd sock;n epoll_ctl(_epfd, action, sock, ev);}else if (action EPOLL_CTL_DEL){n epoll_ctl(_epfd, action, sock, nullptr);}else{n -1;}return n 0;}void Close(){if (_epfd ! g_default_epfd)close(_epfd);}~Epoller(){this-Close();}private:int _epfd;
}; 六、协议定制模块Protocol.hpp
TCP通信的序列化与反序列化就是网络服务器的业务逻辑因为TCP通信是字节流传输无法传输结构化数据所有我们需要自定义协议做序列化与反序列化处理进行字符串数据与结构化数据的转换。
序列化与反序列化可以完全编写函数做字符串数据处理也可以通过调用Json库做字符串数据处理调用Json库需要加上 -ljsoncpp 动态链接库。
这里的序列化与反序列化包含了客户端Client和服务器Serer双端的业务逻辑。
#pragma once#include iostream
#include cstring
#include string
#include sys/types.h
#include sys/socket.h
#include jsoncpp/json/json.h#include Log.hppusing namespace std;#define SEP
#define LINE_SEP \r\nenum
{OK 0,DIV_ZERO,MOD_ZERO,OP_ERR
};// x op y - content_len\r\nx op y\r\n
string En_Length(const string text)
{string send_str to_string(text.size());send_str LINE_SEP;send_str text;send_str LINE_SEP;return send_str;
}// content_len\r\nx op y\r\n
bool De_Length(const string package, string* text)
{auto pos package.find(LINE_SEP);if (pos string::npos)return false;string text_len_str package.substr(0, pos);int text_len stoi(text_len_str);*text package.substr(pos strlen(LINE_SEP), text_len);return true;
}// 通信协议不止一种需要将协议进行编号以供os分辨
// content_len\r\n协议编号\r\nx op y\r\nclass Request
{
public:Request(int x 0, int y 0, char op 0): _x(x), _y(y), _op(op){}// 序列化bool Serialize(string* out){Json::Value root;root[first] _x;root[second] _y;root[oper] _op;Json::FastWriter write;*out write.write(root);return true;}// 反序列化bool Deserialiaze(const string in){Json::Value root;Json::Reader reader;reader.parse(in, root);_x root[first].asInt();_y root[second].asInt();_op root[oper].asInt();return true;}public:int _x, _y;char _op;
};class Response
{
public:Response(int exitcode 0, int res 0): _exitcode(exitcode), _res(res){}bool Serialize(string* out){Json::Value root;root[exitcode] _exitcode;root[result] _res;Json::FastWriter writer;*out writer.write(root);return true;}bool Deserialize(const string in){Json::Value root;Json::Reader reader;reader.parse(in, root);_exitcode root[exitcode].asInt();_res root[result].asInt();return true;}public:int _exitcode;int _res;
};// 读取数据包
// content_len\r\nx op y\r\n
bool Parse_One_Package(string inbuf, string* text)
{*text ;// 分析处理auto pos inbuf.find(LINE_SEP);if (pos string::npos)return false;string text_len_string inbuf.substr(0, pos);int text_len stoi(text_len_string);int total_len text_len_string.size() 2 * strlen(LINE_SEP) text_len;if (inbuf.size() total_len)return false;// 至少有一个完整的报文*text inbuf.substr(0, total_len);inbuf.erase(0, total_len);return true;
}bool Recv_Package(int sock, string inbuf, string* text)
{char buf[1024];while (true){ssize_t n recv(sock, buf, sizeof(buf) - 1, 0);if (n 0){buf[n] 0;inbuf buf;auto pos inbuf.find(LINE_SEP);if (pos string::npos)continue;string text_len_str inbuf.substr(0, pos);int text_len stoi(text_len_str);int total_len text_len_str.size() 2 * strlen(LINE_SEP) text_len;cout \n收到响应报文:\n inbuf;if (inbuf.size() total_len){cout 输入不符合协议规定 endl;continue;}*text inbuf.substr(0, total_len);inbuf.erase(0, total_len);break;}else{return false;}}return true;
} 七、服务器模块Server.hpp server.cc
Server初始化创建listensock、创建epoll模型、创建事件就绪队列(struct epoll_event* _recv)listensock套接字会创建一个注册了_Accepter接口的Connection对象负责新建Client的连接。
所有的Connection对象通过哈希表管理极大的提高了效率。每一个Connection对象都有读写缓冲区(_inbuffer / _outbuffer)可以绑定回调的_Recver、_Sender、_Excepter函数以对事件做读、写、异常处理。
服务器的本质就是一个死循环循环的等待epoll模型的事件通知然后再Dispatch分派任务做读写处理若遇到异常问题将其转化为读写问题再去分派任务。
我们将网络计算服务器的计算任务放入了server.cc中进行函数定义为了解耦我们也可以再单独创建一个Task()对象但是此处由于计算任务比较简单我们就直接在server.cc源文件中定义了。
服务器中的所有监听的Connection对象我们将其读监听设置为一直开启将其写监听设置为按需开启当写任务完成后即关闭写监听。 Server.hpp
#pragma once#include iostream
#include string
#include functional
#include unordered_map
#include cassert
#include unistd.h#include Sock.hpp
#include Epoller.hpp
#include Log.hpp
#include Util.hpp
#include Protocol.hppusing namespace std;class Connection;
class TcpServer;static const uint16_t g_defaultport 8080;
static const int g_num 64;using func_t functionvoid(Connection*);class Connection
{
public:Connection(int sock, TcpServer* tcp): _sock(sock), _tcp(tcp){}void Register(func_t recver, func_t sender, func_t excepter){_recver recver;_sender sender;_excepter excepter;}void Close(){close(_sock);}public:int _sock;string _inbuffer; // 输入缓冲区string _outbuffer; // 输出缓冲区func_t _recver; // 读func_t _sender; // 写func_t _excepter; // 异常TcpServer *_tcp; // 可以省略// uint64_t last_time; // 记录最近访问时间可用于主动关闭某时间段内未访问的连接
};class TcpServer
{
public:TcpServer(func_t service, uint16_t port g_defaultport): _service(service), _port(port), _revs(nullptr){}void InitServer(){// 1. 创建socket_sock.Socket();_sock.Bind(_port);_sock.Listen();// 2. 创建epoller_epoller.Create();// 3. 将目前唯一的一个sock添加到Epoller中Add_Connection(_sock.Fd(), EPOLLIN | EPOLLET,bind(TcpServer::Accepter, this, placeholders::_1), nullptr, nullptr);_revs new struct epoll_event[g_num];_num g_num;}void Enable_Read_Write(Connection* conn, bool readable, bool writeable){uint32_t event (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET;_epoller.Control(conn-_sock, event, EPOLL_CTL_MOD);}// 事件派发void Dispatch(){int timeout -1;while (1){Loop(timeout);// Log_Message(DEBUG, timeout ...);// 遍历_conn_map计算每一个节点的最近访问时间做节点控制}}~TcpServer(){_sock.Close();_epoller.Close();if (nullptr _revs)delete[] _revs;}private:void Add_Connection(int sock, uint32_t events, func_t recver, func_t sender, func_t excepter){// 1. 首先为该sock创建Connection并初始化并添加到_conn_mapif (events EPOLLET)Util::Set_Nonblock(sock);Connection *conn new Connection(sock, this);// 2. 给对应的sock设置对应的回调方法conn-Register(recver, sender, excepter);// 3. 其次将sock与它要关心的时间写透式注册到epoll中让epoll帮我们关心bool f _epoller.Add_Event(sock, events);assert(f);// 4. 将kv添加到_conn_map中_conn_map.insert(pairint, Connection*(sock, conn));Log_Message(DEBUG, Add_Connection: add new sock: %d in epoll and unordered_map, sock);}void Recver(Connection *conn){char buffer[1024];while (1){ssize_t s recv(conn-_sock, buffer, sizeof(buffer) - 1, 0);if (s 0){buffer[s] 0;conn-_inbuffer buffer; // 将读到的数据入队列Log_Message(DEBUG, \n收到client[%d]请求报文:\n%s, conn-_sock, conn-_inbuffer.c_str());_service(conn);}else if (s 0){// 异常回调if (conn-_excepter){conn-_excepter(conn);return;}}else{if (errno EAGAIN || errno EWOULDBLOCK){break;}else if (errno EINTR){continue;}else{if (conn-_excepter){conn-_excepter(conn);return;}}}}}void Sender(Connection *conn){while (1){ssize_t s send(conn-_sock, conn-_outbuffer.c_str(), conn-_outbuffer.size(), 0);if (s 0){if (conn-_outbuffer.empty())break;elseconn-_outbuffer.erase(0, s);}else{if (errno EAGAIN || errno EWOULDBLOCK){break;}else if (errno EINTR){continue;}else{if (conn-_excepter){conn-_excepter(conn);return;}}}}// 如果没有发送完毕需要对对应的sock开启写事件的关心发完了关闭对写事件的关心if (!conn-_outbuffer.empty())conn-_tcp-Enable_Read_Write(conn, true, true);elseconn-_tcp-Enable_Read_Write(conn, true, false);}void Excepter(Connection *conn){_epoller.Control(conn-_sock, 0, EPOLL_CTL_DEL);conn-Close();_conn_map.erase(conn-_sock);Log_Message(DEBUG, 关闭 %d 文件描述符的所有资源, conn-_sock);delete conn;}void Accepter(Connection *conn){while (1){string clientip;uint16_t clientport;int err 0;int sock _sock.Accept(clientip, clientport, err);if (sock 0){Add_Connection(sock, EPOLLIN | EPOLLET, bind(TcpServer::Recver, this, placeholders::_1),bind(TcpServer::Sender, this, placeholders::_1), bind(TcpServer::Excepter, this, placeholders::_1));Log_Message(DEBUG, get a new link, info: [%s : %d], clientip.c_str(), clientport);}else{if (err EAGAIN || err EWOULDBLOCK)break;else if (err EINTR)continue;elsebreak;}}}bool Is_Connection_Exists(int sock){auto iter _conn_map.find(sock);return iter ! _conn_map.end();}void Loop(int timeout){int n _epoller.Wait(_revs, _num, timeout); // 获取已经就绪的事件for (int i 0; i n; i){int sock _revs[i].data.fd;uint32_t events _revs[i].events;// 将所有异常问题转化成读写问题if (events EPOLLERR)events | (EPOLLIN | EPOLLOUT);if (events EPOLLHUP)events | (EPOLLIN | EPOLLOUT);// 读写事件就绪if ((events EPOLLIN) Is_Connection_Exists(sock) _conn_map[sock]-_recver)_conn_map[sock]-_recver(_conn_map[sock]);if ((events EPOLLOUT) Is_Connection_Exists(sock) _conn_map[sock]-_sender)_conn_map[sock]-_sender(_conn_map[sock]);}}private:uint16_t _port;Sock _sock;Epoller _epoller;unordered_mapint, Connection* _conn_map;struct epoll_event* _revs;int _num;func_t _service;
}; server.cc
#include Server.hpp
#include memoryusing namespace std;// 计算任务
bool Cal(const Request req, Response resp)
{resp._exitcode OK;resp._res 0;if (req._op / req._y 0){resp._exitcode DIV_ZERO;return false;}if (req._op % req._y 0){resp._exitcode MOD_ZERO;return false;}switch (req._op){case :resp._res req._x req._y;break;case -:resp._res req._x - req._y;break;case *:resp._res req._x * req._y;break;case /:resp._res req._x / req._y;break;case %:resp._res req._x % req._y;break;default:resp._exitcode OP_ERR;break;}return true;
}void Calculate(Connection* conn)
{string one_package;while (Parse_One_Package(conn-_inbuffer, one_package)){string req_str;if (!De_Length(one_package, req_str))return;// 对请求体Request反序列化得到一个结构化的请求对象Request req;if (!req.Deserialiaze(req_str))return;Response resp;Cal(req, resp);string resp_str;resp.Serialize(resp_str);conn-_outbuffer En_Length(resp_str);cout 构建完整的响应报文: \n conn-_outbuffer endl;}// 直接发if (conn-_sender)conn-_sender(conn);
}static void Usage(std::string proc)
{std::cerr Usage:\n\t proc port \n\n;exit(1);
}string Transaction(const string request)
{return request;
}// ./select_server 8080
int main(int argc, char *argv[])
{// if(argc ! 2)// Usage();// unique_ptrSelectServer svr(new SelectServer(atoi(argv[1])));// std::cout test: sizeof(fd_set) * 8 std::endl;unique_ptrTcpServer svr(new TcpServer(Calculate));svr-InitServer();svr-Dispatch();return 0;
}八、客户端模块Client.hpp client.cc
Client.hpp
#pragma once#include iostream
#include string
#include cstring
#include unistd.h#include sys/types.h
#include sys/socket.h
#include netinet/in.h
#include arpa/inet.h#include Protocol.hppusing namespace std;class Client
{
public:Client(const std::string server_ip, const uint16_t server_port): _sock(-1), _server_ip(server_ip), _server_port(server_port){}void Init(){_sock socket(AF_INET, SOCK_STREAM, 0);if (_sock 0){std::cerr socket error std::endl;exit(1);}}void Run(){struct sockaddr_in server;memset(server, 0, sizeof(server));server.sin_family AF_INET;server.sin_port htons(_server_port);server.sin_addr.s_addr inet_addr(_server_ip.c_str());if (connect(_sock, (struct sockaddr*)server, sizeof(server)) 0){std::cerr connect error std::endl;exit(1);}else{string line;string inbuf;while (true){cout mycal ;getline(cin, line);Request req Parse_Line(line); // 输入字符串生成Request对象string content;req.Serialize(content); // Request对象序列化string send_str En_Length(content); // 序列化字符串编码 - content_len\r\nx op y\r\nsend(_sock, send_str.c_str(), send_str.size(), 0);// 将服务器的返回结果序列化与反序列化string package, text;if (!Recv_Package(_sock, inbuf, package))continue;if (!De_Length(package, text))continue;Response resp;resp.Deserialize(text);cout 计算结果: endl;cout exitcode: resp._exitcode , ;cout result: resp._res endl endl;}}}// 将输入转化为Request结构Request Parse_Line(const string line){int status 0; // 0操作符之前 1遇到操作符 2操作符之后int cnt line.size();string left, right;char op;int i 0;while (i cnt){switch (status){case 0:if (!isdigit(line[i])){if (line[i] ){i;break;}op line[i];status 1;}else{left.push_back(line[i]);}break;case 1:i;if (line[i] )break;status 2;break;case 2:right.push_back(line[i]);break;}}return Request(stoi(left), stoi(right), op);}~Client(){if (_sock 0)close(_sock);}private:int _sock;string _server_ip;uint16_t _server_port;
}; client.cc
#include Client.hpp
#include memoryusing namespace std;static void Usage(string proc)
{cout \nUsage:\n\t proc local_port\n\n;exit(1);
}int main(int argc, char* argv[])
{if (argc ! 3)Usage(argv[0]);string server_ip argv[1];uint16_t server_port atoi(argv[2]);unique_ptrClient tcli(new Client(server_ip, server_port));tcli-Init();tcli-Run();return 0;
}