帮助文档
专业提供香港服务器、香港云服务器、香港高防服务器租用、香港云主机、台湾服务器、美国服务器、美国云服务器vps租用、韩国高防服务器租用、新加坡服务器、日本服务器租用 一站式全球网络解决方案提供商!专业运营维护IDC数据中心,提供高质量的服务器托管,服务器机房租用,服务器机柜租用,IDC机房机柜租用等服务,稳定、安全、高性能的云端计算服务,实时满足您的多样性业务需求。 香港大带宽稳定可靠,高级工程师提供基于服务器硬件、操作系统、网络、应用环境、安全的免费技术支持。
服务器资讯 / 香港服务器租用 / 香港VPS租用 / 香港云服务器 / 美国服务器租用 / 台湾服务器租用 / 日本服务器租用 / 官方公告 / 帮助文档
Linux学习记录——사십사 高级IO(5)--- Epoll型服务器(2)(Reactor)
发布时间:2024-03-06 15:18:10   分类:帮助文档
Linux学习记录——사십사 高级IO(5)--- Epoll型服务器(2)(Reactor) 文章目录 1、完善Epoll简单服务器2、打造统一的分开处理的体系3、epoll工作模式4、ET模式5、继续完善,处理写事件6、引入自定义协议,处理写事件 本篇基于上篇代码继续改进,很长。关于Reactor的说明在后一篇 1、完善Epoll简单服务器 上面的代码在处理读事件时,用的request数组是临时的,如果有数据没读完,那么下次再来到这里,就没有这些数据了。所以得让每一个fd都有自己的缓冲区。建立一个Connection类,然后有一个map结构,让这个类和每个fd建立映射。Start函数改一下,不管超时还是出错,就只处理数据,处理的部分交给HandlerEvent,改名成LoopOnce,也就是说,Start那里还是有循环,每次循环都去执行L函数,L函数用Wait提取一次,然后处理。 void Start() { //1、将listensock添加到epoll中,要先有epoll模型 bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN);//只关心读事件 assert(r);//可以做别的判断 (void)r; struct epoll_event revs_[gnum]; int timeout = 1000; while(true) { LoopOnce(timeout); } } void Accepter() { std::string clientip; uint16_t clientport; int sock = listensock_.Accept(&clientip, &clientport); if (sock < 0) return ; logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport); // 还不能recv,即使有了连接但也不知道有没有数据 // 只有epoll知道具体情况,所以将sock添加到epoll中 bool r = epoller_.AddEvent(sock, EPOLLIN); assert(r); (void)r; } void Recver(int fd) { char request[1024]; ssize_t s = recv(fd, request, sizeof(request) - 1, 0); if (s > 0) { request[s - 1] = 0; // 对打印格式 request[s - 2] = 0; // 做一下调整 std::string response = func_(request); send(fd, response.c_str(), response.size(), 0); } else { if (s == 0) logMessage(Info, "client quit ..."); else logMessage(Warning, "recv error, client quit..."); close(fd); // 将文件描述符移除 // 在处理异常的时候,fd必须合法才能被处理 epoller_.DelEvent(fd); } } void LoopOnce(int timeout) { int n = epoller_.Wait(revs_, gnum, timeout); for(int i = 0; i < n; i++) { int fd = revs_[i].data.fd; uint32_t events = revs_[i].events; logMessage(Debug, "当前正在处理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER"); if(events & EPOLLIN)//判断读事件就绪 { if (fd == listensock_.Fd()) { // 1、新连接到来 Accepter(); } else { // 2、读事件 Recver(fd); } } } } class Connection { public: Connection(int fd): fd_(fd) {} ~Connection() {} public: int fd_; std::string inbuffer_; std::string outbuffer_; }; std::unordered_map connections_; 把Start的初始化任务交给InitServer void InitServer() { listensock_.Socket(); listensock_.Bind(port_); listensock_.Listen(); epoller_.Create(); logMessage(Debug, "init server success"); //为listensock创建对应的connection对象 Connection* conn = new Connection(listensock_.Fd()); //将listensock和connection对象添加到connections_ connections_.insert(std::pair(listensock_.Fd(), conn)); //将listensock添加到epoll中 bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN); assert(r); (void)r; } void Start() { struct epoll_event revs_[gnum]; int timeout = 1000; while(true) { LoopOnce(timeout); } } 同样地,Accepter有添加到epoll的fd也要映射上自己的Connection类,Recver那里就可以也改一下了 void Accepter() { std::string clientip; uint16_t clientport; int sock = listensock_.Accept(&clientip, &clientport); if (sock < 0) return ; logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport); // 还不能recv,即使有了连接但也不知道有没有数据 // 只有epoll知道具体情况,所以将sock添加到epoll中 Connection* conn = new Connection(sock); connections_.insert(std::pair(sock, conn)); bool r = epoller_.AddEvent(sock, EPOLLIN); assert(r); (void)r; } void Recver(int fd) { char request[1024]; ssize_t s = recv(fd, request, sizeof(request) - 1, 0); if (s > 0) { request[s - 1] = 0; // 对打印格式 request[s - 2] = 0; // 做一下调整 connections_[fd]->inbuffer_ += request; std::string response = func_(request); send(fd, response.c_str(), response.size(), 0); } else { if (s == 0) logMessage(Info, "client quit ..."); else logMessage(Warning, "recv error, client quit..."); close(fd); // 将文件描述符移除 // 在处理异常的时候,fd必须合法才能被处理 epoller_.DelEvent(fd); } } 所有就绪的fd,不只包含我们关心的fd,都要有Connection类。Accepter那里,得到连接后,获取套接字,不直接读取,因为不知道是否有数据,就交给epoll,不过获取套接字后,每个套接字都需要正确读取自己的报文,所以Connection有了两个buffer。 所有就绪的fd,不仅要有Connection类,还要被epoll管理。但这样的代码并不高效,删除的时候要从epoll里删,还要从connections_里删,且代码也不够简洁。 封装并修改一下形式 class Connection { public: Connection(const int& fd, const std::string& clientip, const uint16_t& clientport) : fd_(fd), clientip_(clientip), clientport_(clientport) {} ~Connection() {} public: int fd_; std::string inbuffer_; std::string outbuffer_; std::string clientip_; uint16_t clientport_; }; //... void InitServer() { listensock_.Socket(); listensock_.Bind(port_); listensock_.Listen(); epoller_.Create(); //为listensock创建对应的connection对象 //将listensock和connection对象添加到connections_ //将listensock添加到epoll中 AddConnection(listensock_.Fd(), EPOLLIN); logMessage(Debug, "init server success"); } void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport) { //1、构建connection对象,交给connections_管理 Connection* conn = new Connection(fd, ip, port); connections_.insert(std::pair(fd, conn)); //2、fd和events写到内核中 bool r = epoller_.AddEvent(fd, events); assert(r); (void)r; logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port); } void Accepter() { std::string clientip; uint16_t clientport; int sock = listensock_.Accept(&clientip, &clientport); if (sock < 0) return ; logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport); // 还不能recv,即使有了连接但也不知道有没有数据 // 只有epoll知道具体情况,所以将sock添加到epoll中 AddConnection(sock, EPOLLIN, clientip, clientport); } void Recver(int fd) { char request[1024]; ssize_t s = recv(fd, request, sizeof(request) - 1, 0); if (s > 0) { request[s - 1] = 0; // 对打印格式 request[s - 2] = 0; // 做一下调整 connections_[fd]->inbuffer_ += request; std::string response = func_(request); send(fd, response.c_str(), response.size(), 0); } else { if (s == 0) logMessage(Info, "client quit ..."); else logMessage(Warning, "recv error // 在处理异常的时候,fd必须合法才能被处理 epoller_.DelEvent(fd); } } 2、打造统一的分开处理的体系 现有的Accepter、Recver都是处理写事件的,LoopOnce那里可以加个读事件的判断,但相关的处理函数要怎么写?为了简便,这里再引入回调函数。 const static int gport = 8888; class Connection; using func_t = std::function; using callback_t = std::function; class Connection { public: Connection(const int& fd, const std::string& clientip, const uint16_t& clientport) : fd_(fd), clientip_(clientip), clientport_(clientport) {} void Register(callback_t recver, callback_t sender, callback_t excepter) { recver_ = recver; sender_ = sender; excepter_ = excepter; } ~Connection() {} public: //IO信息 int fd_; std::string inbuffer_; std::string outbuffer_; //IO处理 callback_t recver_; callback_t sender_; callback_t excepter_; //用户信息 std::string clientip_; uint16_t clientport_; }; Register为注册方法,也就是要使用的方法。在AddConnection函数中,要判断一下,是我们关心的和不是我们关心的,都调用注册方法,但传的参数不一样。 void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport) { //2、构建connection对象,交给connections_管理 Connection* conn = new Connection(fd, ip, port); if(fd == listensock_.Fd()) { conn->Register(); } else { conn->Register(); } connections_.insert(std::pair(fd, conn)); //3、fd和events写到内核中 bool r = epoller_.AddEvent(fd, events); assert(r); (void)r; logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port); } Accepter那里,里面有AddConnection函数。当LoopOnce调用Accepter时,这个函数也要用回调函数,这样就是一个类的成员函数要调用另一个类的回调函数。 void Accepter(Connection* conn) { (void) conn;//先闲置不用 std::string clientip; uint16_t clientport; int sock = listensock_.Accept(&clientip, &clientport); if (sock < 0) return ; logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport); // 还不能recv,即使有了连接但也不知道有没有数据 // 只有epoll知道具体情况,所以将sock添加到epoll中 AddConnection(sock, EPOLLIN, clientip, clientport); } AddConnection中,Regsiter三个参数都是callback_t类型的,我们可以这样写 if(fd == listensock_.Fd()) { conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); } 这样设置,当我们关心的套接字上有事件就绪时,读方法就绑定Accepter。是其它套接字的话 else { conn->Register(std::bind(&EpollServer::Recver, this, std::placeholders::_1), std::bind(&EpollServer::Sender, this, std::placeholders::_1), std::bind(&EpollServer::Excepter, this, std::placeholders::_1)); } void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport) { //2、构建connection对象,交给connections_管理 Connection* conn = new Connection(fd, ip, port); if(fd == listensock_.Fd()) { conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); } else { conn->Register(std::bind(&EpollServer::Recver, this, std::placeholders::_1), std::bind(&EpollServer::Sender, this, std::placeholders::_1), std::bind(&EpollServer::Excepter, this, std::placeholders::_1)); } connections_.insert(std::pair(fd, conn)); //3、fd和events写到内核中 bool r = epoller_.AddEvent(fd, events); assert(r); (void)r; logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port); } Recver和Sender函数要传的参数都是Connection* conn,Sender和Excepter下面再写。这样的设计主要是为了更集中实现功能,代码分明。 在LoopOnce就这样写: void LoopOnce(int timeout) { int n = epoller_.Wait(revs_, gnum, timeout); for(int i = 0; i < n; i++) { int fd = revs_[i].data.fd; uint32_t events = revs_[i].events; logMessage(Debug, "当前正在处理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER"); if(events & EPOLLIN) connections_[fd]->recver_(connections_[fd]); if(events & EPOLLOUT) connections_[fd]->sender_(connections_[fd]); if((events & EPOLLERR) || (events & EPOLLHUP)) connections_[fd]->excepter_(connections_[fd]); } } 这样就形成了一整个体系。写事件,读事件,其它事件都有了处理。当服务器启动后,服务器就监听事件,一旦事件就绪,就会根据不同的事件类型来派发事件到不同的Connection中,由Connection来调用对应的函数来处理。 这时候,Start函数就是事件派发器,可以写为Disptcher()。接下来要写Recver、Sender、Excepter。 3、epoll工作模式 select,poll,epoll三个,一旦有事件就绪,如果上层不取,底层就会一直通知事件就绪,这种模式叫做LT模式,水平触发Level Triggered工作模式。epoll默认LT,另有一个ET模式,边缘触发Edge Triggered工作模式,在数据变化时只通知一次,变化就是从无到有,从有到多。ET倒逼程序员必须一次将本轮数据全部读取完毕,怎样保证读完?可以循环读取,直到某次读取的数量比每次要的量少,比如等于0或者小于这个数,就说明读完了;但因为recv/read是默认阻塞的,所以循环读取可能阻塞住,比如读完几次后刚好全部读完,那么下次读取就阻塞了,所以ET模式下,所有的读取和写入都必须是非阻塞的接口。 LT也可以在非阻塞的情况写入,读取,当然也可以在阻塞模式下工作。但LT也不能代替ET,因为代码无法统一起来,而ET只能是非阻塞,ET倒逼程序员写成它自己的形式。ET通知效率 >= LT,IO效率也是一样。 一次通知就是一次系统调用返回,一次返回必定对应一次调用,ET能有效减少系统调用次数。ET倒逼程序员尽快取走数据的本质是让TCP底层更新出更大的接收窗口,以较大概率地增加对方的滑动窗口的大小,提高发送效率。 ET并非能替代LT,ET适合高IO场景,LT能够读一部分就处理一部分,ET必须得读完才行。epoll接口默认LT。 4、ET模式 ET的设置是一个宏,EPOLLET。在我们的InitServer初始化函数中加上这个宏就行。下面也放了AddConnection的代码。 void InitServer() { listensock_.Socket(); listensock_.Bind(port_); listensock_.Listen(); epoller_.Create(); //为listensock创建对应的connection对象 //将listensock和connection对象添加到connections_ //将listensock添加到epoll中 AddConnection(listensock_.Fd(), EPOLLIN | EPOLLET); logMessage(Debug, "init server success"); } void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport) { //2、构建connection对象,交给connections_管理 Connection* conn = new Connection(fd, ip, port); if(fd == listensock_.Fd()) { conn->Register(std::bind(EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); } else { conn->Register(std::bind(EpollServer::Recver, this, std::placeholders::_1), std::bind(EpollServer::Sender, this, std::placeholders::_1), std::bind(EpollServer::Excepter, this, std::placeholders::_1)); } connections_.insert(std::pair(fd, conn)); //3、fd和events写到内核中 bool r = epoller_.AddEvent(fd, events); assert(r); (void)r; logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port); } 除此之外,监听的套接字也得设置成非阻塞,用fcntl接口。写一个Util.hpp #pragma once #include #include #include class Util { public: static bool SetNonBlock(int fd) { int fl = fcntl(fd, F_GETFL); if(fl < 0) return false; fcntl(fd, F_SETFL, fl | O_NONBLOCK); return true; } }; 在EpollServer.hpp中 void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport) { //1、设置fd非阻塞 if(events & EPOLLET) Util::SetNonBlock(fd); //2、构建connection对象,交给connections_管理 Connection* conn = new Connection(fd, ip, port); Accepter里 AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport); 5、继续完善,处理写事件 现在的代码,从InitServer,传listensock_.Fd()到AddConnection中,然后会调用Accetper函数,但只能读一个连接,得改成循环的。给Connection类加一个成员uint32_t events,AddConnection函数中在插入connections_数组前给conn->events赋值,Accepter函数中传过来的参数是conn,通过events来判断是否需要循环。 class Connection { public: Connection(const int& fd, const std::string& clientip, const uint16_t& clientport) : fd_(fd), clientip_(clientip), clientport_(clientport) {} void Register(callback_t recver, callback_t sender, callback_t excepter) { recver_ = recver; sender_ = sender; excepter_ = excepter; } ~Connection() {} public: //IO信息 int fd_; std::string inbuffer_; std::string outbuffer_; //IO处理 callback_t recver_; callback_t sender_; callback_t excepter_; //用户信息 std::string clientip_; uint16_t clientport_; uint32_t events; }; void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport) { //1、设置fd非阻塞 if(events & EPOLLET) Util::SetNonBlock(fd); //2、构建connection对象,交给connections_管理 Connection* conn = new Connection(fd, ip, port); if(fd == listensock_.Fd()) { conn->Register(std::bind(EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); } else { conn->Register(std::bind(EpollServer::Recver, this, std::placeholders::_1), std::bind(EpollServer::Sender, this, std::placeholders::_1), std::bind(EpollServer::Excepter, this, std::placeholders::_1)); } conn->events = events; connections_.insert(std::pair(fd, conn)); //3、fd和events写到内核中 bool r = epoller_.AddEvent(fd, events); assert(r); (void)r; logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port); } void Accepter(Connection* conn) { do { std::string clientip; uint16_t clientport; int sock = listensock_.Accept(&clientip, &clientport); if (sock < 0) return; logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport); // 还不能recv,即使有了连接但也不知道有没有数据 // 只有epoll知道具体情况,所以将sock添加到epoll中 AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport); } while (conn->events & EPOLLET);//如果是ET模式就循环,不是就退出 } 是我们需要的,就走Accetper方法,不是就走Recver,Sender,Excepter方法。上面Accepter函数中的Accept函数有返回值,这里的处理就是如果出错就返回,不过现在得处理出错,如果不出错才能继续走AddConnection函数。先在Sock.hpp中改一下,加上err参数,err = errno。 int Accept(std::string* clientip, uint16_t* clientport, int* err) { struct sockaddr_in temp; socklen_t len = sizeof(temp); int sock = accept(_sock, (struct sockaddr*)&temp, &len); *err = errno; if(sock < 0) { logMessage(Warning, "accept error, code: %d, errstring: %s", errno, strerror(errno)); } else { *clientip = inet_ntoa(temp.sin_addr);//这个函数就可以从结构体中拿出ip地址,转换好后返回 *clientport = ntohs(temp.sin_port); } return sock; } Accepter函数 void Accepter(Connection* conn) { do { int err = 0; std::string clientip; uint16_t clientport; int sock = listensock_.Accept(&clientip, &clientport, &err); if (sock < 0) { logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport); // 还不能recv,即使有了连接但也不知道有没有数据 // 只有epoll知道具体情况,所以将sock添加到epoll中 AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport); } else { if(err == EAGAIN || err == EWOULDBLOCK) break;//读完了,缓冲区满了 else if(err == EINTR) continue;//有信号暂时中断,后续还得继续读 else//异常,本次获取连接失败,继续读下一个连接 { logMessage(Warning, "errstring: %s, errcode: %d", strerror(err), err); continue; } } } while (conn->events & EPOLLET);//如果是ET模式就循环,不是就退出 logMessage(Debug, "accepter done ..."); } 再完成Recver,Sender,Excepter函数 void Recver(Connection* conn) { //读取完了本轮数据 do { char buffer[bsize];//1024 ssize_t n = recv(conn->fd_, buffer, sizeof(buffer) - 1, 0); if(n > 0) { buffer[n] = 0; conn->inbuffer_ += buffer; //根据基本协议,进行数据分析,边读取边分析 } else if(n == 0)//另一端关闭了套接字,要关闭连接 { conn->excepter_(conn);//归到异常处理 } else { if(errno == EAGAIN || errno == EWOULDBLOCK) break; else if(errno == EINTR) continue; else conn->excepter_(conn); } } while (conn->events & EPOLLET); //根据基本协议,进行数据分析 } 分析数据可以全读完再分析,也可以边读边分析,这就需要有协议规定,协议在之前有写过简单的代码。现在有3种情况会调用异常处理函数Excpter,Recver函数读的时候异常, Sender函数发送数据出现异常,LoopOnce里也有。这样情况有些多,代码写起来也不够好。改一下 void LoopOnce(int timeout) { int n = epoller_.Wait(revs_, gnum, timeout); for(int i = 0; i < n; i++) { int fd = revs_[i].data.fd; uint32_t events = revs_[i].events; logMessage(Debug, "当前正在处理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER"); //下面这句就是把所有异常情况都转化为Recver和Sender去调用异常函数 if((events & EPOLLERR) || (events & EPOLLHUP)) events |= (EPOLLIN | EPOLLOUT); //下面这两个也要改一下,要保证连接存在 if((events & EPOLLIN) && ConnIsExists(fd)) connections_[fd]->recver_(connections_[fd]); if((events & EPOLLOUT) && ConnIsExists(fd)) connections_[fd]->sender_(connections_[fd]); } } bool ConnIsExists(int fd) { return connections_.find(fd) != connections_.end(); } 6、引入自定义协议,处理写事件 用之前的Util.hpp和Protocol.hpp,有序列化反序列化,所以Makefile里得加上-ljsoncpp epollserver:Main.cc g++ -o $@ $^ -ljsoncpp -std=c++11 .PHONY:clean clean: rm -f epollserver Util.hpp #pragma once #include #include #include #include #include #include using namespace std; class Util { public: static bool SetNonBlock(int fd) { int fl = fcntl(fd, F_GETFL); if(fl < 0) return false; fcntl(fd, F_SETFL, fl | O_NONBLOCK); return true; } static bool StringSplit(const std::string& str, const std::string& sep, std::vector* result) { size_t start = 0; while(start < str.size()) { auto pos = str.find(sep, start); if(pos == std::string::npos) break; result->push_back(str.substr(start, pos - start)); start = pos + sep.size(); } if(start < str.size()) result->push_back(str.substr(start)); return true; } static int toInt(const std::string& s) { return atoi(s.c_str()); } }; Protocol.hpp #pragma once #include #include #include #include #include #include #include #include "Util.hpp" //#define MESELF 1 //给网络版本计算机定制协议 namespace protocol_ns { #define SEP " " #define SEP_LEN strlen(SEP)//不能用sizeof #define HEADER_SEP "\r\n" #define HEADER_SEP_LEN strlen("\r\n") //"长度"\r\n"_x_op_y"\r\n //假设报文是这样的: "7"\r\n"10 + 20"\r\n,这就相当于报头 + 有效载荷 //请求/响应 = 报头\r\n有效载荷\r\n,只是请求和响应的有效载荷不同 std::string AddHeader(const std::string &str) { std::cout << "AddHeader 之前:\n" << str << std::endl; std::string s = std::to_string(str.size()); s += HEADER_SEP; s += str; s += HEADER_SEP; std::cout << "AddHeader 之hou:\n" << s << std::endl; return s; } std::string RemoveHeader(const std::string& str, int len) { std::cout << "RemoveHeader 之前:\n" << str << std::endl; std::string res = str.substr(str.size() - HEADER_SEP_LEN - len, len); std::cout << "RemoveHeader 之后:\n" << str << std::endl; return res; } int ReadPackage(int sock, std::string& inbuffer, std::string* package) { std::cout << "ReadPackage inbuffer 之前:\n" << inbuffer << std::endl; //读取 ———— 字符串"7"\r\n"10 + 20"\r\n char buffer[1024]; ssize_t s = recv(sock, buffer, sizeof(buffer - 1), 0); if(s <= 0) return -1; buffer[s] = 0; inbuffer += buffer;//此时inbuffer里就有了这样的字符串: "7"\r\n"10 + 20"\r\n //分析 auto pos = inbuffer.find(HEADER_SEP); if(pos == std::string::npos) return 0;//没找到\r\n那么就不是正确的字符串,不动inbuffer里的内容,退出 std::string lenStr = inbuffer.substr(0, pos);//获取头部字符串7 int len = Util::toInt(lenStr);//得到了长度7,也就是有效载荷长度 int targetPackagelen = lenStr.size() + len + 2 * HEADER_SEP_LEN;//接收到的有报文的字符串长度就是这个 if(inbuffer.size() < targetPackagelen) return 0; //提取报文有效载荷 *package = inbuffer.substr(0, targetPackagelen);//package保存了"7"\r\n"10 + 20"\r\n,去掉其它符号的工作交给RemoveHeader inbuffer.erase(0, targetPackagelen);//只有到这里才改变inbuffer里的内容,从inbuffer里直接移除整个报文 std::cout << "ReadPackage inbuffer 之后:\n" << inbuffer << std::endl; return len;//len就是有效载荷的长度 } class Request { public: Request() {}//为无参构造而准备的,这样就是一个无参一个有参 Request(int x, int y, char op) : _x(x), _y(y), _op(op) {} bool Serialize(std::string* outstr)//序列化:结构体转字符串 { *outstr = ""; #ifdef MYSELF std::string x_string = std::to_string(_x); std::string y_string = std::to_string(_y); // 手动序列化 *outstr = x_string + SEP + _op + SEP + y_string; #else Json::Value root;//Value是一个万能对象,接受任何一个kv类型 root["x"] = _x; root["y"] = _y;//所有放进去的会自动转为string类型 root["op"] = _op; //Json::FastWriter writer;//FastWriter用来序列化,把结构化的数据转为字符串类型 Json::StyledWriter writer; *outstr = writer.write(root); #endif return true; } bool Deserialize(const std::string& instr)//反序列化:字符串转结构体 { #ifdef MYSELF std::vector result; Util::StringSplit(instr, SEP, &result); if (result.size() != 3) return false; _x = Util::toInt(result[0]); _y = Util::toInt(result[2]); if (result[1].size() == 1) return false; // 协议规定 _op = result[1][0]; // 因为是字符,所以只要一个符号即可 std::cout << "_x: \n" << _x << "_y: \n" << _y << "_op: " << _op << std::endl; #else Json::Value root; Json::Reader reader;//Reader用来反序列化 reader.parse(instr, root); _x = root["x"].asInt();//拿到的是字符串,要转成int类型 _y = root["y"].asInt(); //_op虽然是char,但它在计算机里就是整数,序列化时放进root的就是整数类型,反序列化时转成int类型,然后编译器会根据char类型自动解释成char类型 _op = root["op"].asInt(); #endif return true; } ~Request() {} public: int _x; int _y; char _op; }; class Response { public: Response() {} Response(int result, int code) : _result(result), _code(code) {} bool Serialize(std::string* outstr) { *outstr = ""; #ifdef MYSELF std::string res_string = std::to_string(_result); std::string code_string = std::to_string(_code); // 手动序列化 *outstr = res_string + SEP + code_string; #else Json::Value root; root["result"] = _result; root["code"] = _code; //Json::FastWriter writer; Json::StyledWriter writer; *outstr = writer.write(root); #endif return true; } bool Deserialize(const std::string& instr) { #ifdef MYSELF std::vector result; Util::StringSplit(instr, SEP, &result); if (result.size() != 2) return false; _result = Util::toInt(result[0]); _code = Util::toInt(result[1]); std::cout << "_result: \n" << _result << "_code: " << _code << std::endl; #else Json::Value root; Json::Reader reader; reader.parse(instr, root); _result = root["result"].asInt(); _code = root["code"].asInt(); #endif return true; } ~Response() {} public: int _result; int _code;//0表示计算成功,剩余的数字就是各种非法操作的错误码 }; } ReadPackage改一下,之前是接收并分析,现在只做分析 int ParsePackage(std::string& inbuffer, std::string* package) { std::cout << "ReadPackage inbuffer 之前:\n" << inbuffer << std::endl; //分析 auto pos = inbuffer.find(HEADER_SEP); if(pos == std::string::npos) return 0;//没找到\r\n那么就不是正确的字符串,不动inbuffer里的内容,退出 std::string lenStr = inbuffer.substr(0, pos);//获取头部字符串7 int len = Util::toInt(lenStr);//得到了长度7,也就是有效载荷长度 int targetPackagelen = lenStr.size() + len + 2 * HEADER_SEP_LEN;//接收到的有报文的字符串长度就是这个 if(inbuffer.size() < targetPackagelen) return 0; //提取报文有效载荷 *package = inbuffer.substr(0, targetPackagelen);//package保存了"7"\r\n"10 + 20"\r\n,去掉其它符号的工作交给RemoveHeader inbuffer.erase(0, targetPackagelen);//只有到这里才改变inbuffer里的内容,从inbuffer里直接移除整个报文 std::cout << "ReadPackage inbuffer 之后:\n" << inbuffer << std::endl; return len;//len就是有效载荷的长度 } 继续写EpollServer.hpp中的Recver和Sender函数 void Recver(Connection* conn) { //读取完了本轮数据 do { char buffer[bsize];//1024 ssize_t n = recv(conn->fd_, buffer, sizeof(buffer) - 1, 0); if(n > 0) { buffer[n] = 0; conn->inbuffer_ += buffer; //根据基本协议,进行数据分析,边读取边分析; std::string requestStr; int n = ParsePackage(conn->inbuffer_, &requestStr); //看ParsePackage //n为0表示没有不合理字符串或者inbuffer剩下的不够规定的长度,不用判断 if(n > 0)//保证读到了完整的请求 { //回调函数在Main.cc中 //这边先反序列化,再交给回调函数 //上面改成using func_t = std::function; requestStr = RemoveHeader(requestStr, n); Request req; req.Deserialize(requestStr); func_(req);//交给回调函数处理 } } else if(n == 0)//另一端关闭了套接字,要关闭连接 { conn->excepter_(conn);//归到异常处理 } else { if(errno == EAGAIN || errno == EWOULDBLOCK) break; else if(errno == EINTR) continue; else conn->excepter_(conn); } } while (conn->events & EPOLLET); } 更改在n > 0的判断后 Main.cc #include "EpollServer.hpp" #include //用之前网络计算器的计算函数 Response CalculateHelper(const Request& req) { Response resp(0, 0); switch(req._op) { case '+': resp._result = req._x + req._y; break; case '-': resp._result = req._x - req._y; break; case '*': resp._result = req._x * req._y; break; case '/': if(req._y == 0) resp._code = 1; else resp._result = req._x / req._y; break; case '%': if(req._y == 0) resp._code = 2; else resp._result = req._x / req._y; break; default: resp._code = 3; break; } return resp; } void Calculate(const Request& req) { Response resp = CalculateHelper(req); //序列化,当然这里放到EpollServer.hpp更好 std::string sendStr; resp.Serialize(&sendStr); sendStr = AddHeader(sendStr); //序列化后发送出去 } int main() { std::unique_ptr svr(new EpollServer(Calculate)); svr->InitServer(); svr->Disptcher(); return 0; } epoll中关于fd的读取,一般要常设置,也就是一直要让epoll关心;关于fd的写入,则是按需设置,不能常设置,只有需要发送的时候才设置。发送的对象就是Connection中的outbuffer。 //EpollServer.hpp using func_t = std::function; //... func_(conn, req);//Recver函数中 //Main.cc void Calculate(Connection* conn, const Request& req) { Response resp = CalculateHelper(req); //序列化,当然这里放到EpollServer.hpp更好 std::string sendStr; resp.Serialize(&sendStr); //序列化后发送出去 conn->outbuffer_ += sendStr; //开启对写事件的关心 } 加上写事件是要对内核做操作,在EpollServer.hpp的EpollServer类中中再添加一个函数专门做这个事,不过Main.cc中传过来的参数只有Connection类的,所有这个类还得添加一个成员来调用这个函数 class EpollServer; //... EpollServer* R; //AddConnection里conn->events = events后 conn->R = this;//函数属于EpollServer类,this就是这个类 //... void Calculate(Connection* conn, const Request& req) { Response resp = CalculateHelper(req); //序列化,当然这里放到EpollServer.hpp更好 std::string sendStr; resp.Serialize(&sendStr); //序列化后发送出去 conn->outbuffer_ += sendStr; //开启对写事件的关心 conn->R->EnableReadWrite(conn, true, true); } 开启后,Epoll底层会调用Sender函数来发送。 void Sender(Connection* conn) { do { ssize_t n = send(conn->fd_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0); //体现按需思路 if(n > 0)//发送成功,发送了局部或全部 { conn->outbuffer_.erase(0, n); if(conn->outbuffer_.empty())//把数据发完了 { EnableReadWrite(conn, true, false);//去掉对写事件的关心 break; } else//没发完,也就是发送了局部 { EnableReadWrite(conn, true, true);//继续 } } else { //和Accepter里的一样的解释 if(errno == EAGAIN || errno == EWOULDBLOCK) break; else if(errno == EINTR) continue; else { conn->excepter_(conn); break; } } } while (conn->events & EPOLLET);//ET模式就一直循环,不是就退出 } 通常初次设置对写事件的关心,发送缓冲区是空的会,因此立马触发一次对应的fd的就绪,此时epoll底层会自动调用回调函数,从而使用Sender函数。 实现EnableReadWrite函数 bool EnableReadWrite(Connection* conn, bool readable, bool writeable) { conn->events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET); //修改Epoll.hpp中的AddEvent函数为AddModEvent,传入一个op,用来实现Add和Mod两个功能 return epoller_.AddModEvent(conn->fd_, conn->events, EPOLL_CTL_MOD); } bool AddModEvent(int fd, uint32_t events, int op) { struct epoll_event ev; ev.events = events; ev.data.fd = fd;//属于用户的数据,epoll底层不对该数据做任何修改,为了给未来就绪返回 int n = epoll_ctl(epfd_, op, fd, &ev); if(n < 0) { logMessage(Fatal, "epoll_ctl error, code: %d, errstring: %s", errno, strerror(errno)); return false; } return true; } 这样就完成了服务器的拉取工作,也就是有数据来了,可以返回结果。 现在这样的代码是没问题的,但也有些复杂,我们希望暴露在外面的更少,所有工作都在底层完成,上层不需要关心,只调用接口就好。 本篇gitee 下一篇继续写。 结束。
香港云服务器租用推荐
服务器租用资讯
·广东云服务有限公司怎么样
·广东云服务器怎么样
·广东锐讯网络有限公司怎么样
·广东佛山的蜗牛怎么那么大
·广东单位电话主机号怎么填写
·管家婆 花生壳怎么用
·官网域名过期要怎么办
·官网邮箱一般怎么命名
·官网网站被篡改怎么办
服务器租用推荐
·美国服务器租用
·台湾服务器租用
·香港云服务器租用
·香港裸金属服务器
·香港高防服务器租用
·香港服务器租用特价