帮助文档
专业提供香港服务器、香港云服务器、香港高防服务器租用、香港云主机、台湾服务器、美国服务器、美国云服务器vps租用、韩国高防服务器租用、新加坡服务器、日本服务器租用 一站式全球网络解决方案提供商!专业运营维护IDC数据中心,提供高质量的服务器托管,服务器机房租用,服务器机柜租用,IDC机房机柜租用等服务,稳定、安全、高性能的云端计算服务,实时满足您的多样性业务需求。 香港大带宽稳定可靠,高级工程师提供基于服务器硬件、操作系统、网络、应用环境、安全的免费技术支持。
服务器资讯 / 香港服务器租用 / 香港VPS租用 / 香港云服务器 / 美国服务器租用 / 台湾服务器租用 / 日本服务器租用 / 官方公告 / 帮助文档
TCP服务器的演变过程:使用epoll构建reactor网络模型实现百万级并发(详细代码)
发布时间:2024-03-07 00:53:10   分类:帮助文档
TCP服务器的演变过程:使用epoll构建reactor网络模型实现百万级并发(详细代码) 使用epoll构建reactor网络模型实现百万级并发(详细代码) 一、前言二、reactor简介三、实现步骤3.1、step 1:定义Reactor模型相关结构体3.2、step 2:实现Reactor容器初始化功能3.3、step 3:实现socket初始化功能3.4、step 4:实现Reactor动态扩容功能3.5、step 5:实现Reactor索引功能3.6、step 6:实现设置事件信息功能3.7、step 7:实现IO事件监听功能3.8、step 8:实现IO事件移除功能3.9、step 9:实现Reactor事件监听功能3.10、step 10:实现recv回调函数3.11、step 11:实现send回调函数3.12、step 12:实现accept回调函数3.13、step 13:实现reactor运行函数3.14、step 14:实现reactor销毁功能3.15、使用示例 四、完整代码五、百万级并发连接测试小结 一、前言 手把手教你从0开始编写TCP服务器程序,体验开局一块砖,大厦全靠垒。 为了避免篇幅过长使读者感到乏味,对【TCP服务器的开发】进行分阶段实现,一步步进行优化升级。 本节,在上一章节介绍了如何使用epoll开发高效的服务器,本节将介绍使用epoll构建reactor网络模型,实现异步事件处理。 网络并发,通俗的讲就是服务器可以承载的客户端数量,即服务器可以稳定保证客户端同时接入的数量。 二、reactor简介 Reactor模型开发效率比直接使用IO多路复用要高,它一般是单线程的,设计目标是希望一个线程使用CPU的全部资源;带来的优点是,在每个事件处理中很多时候不需要考虑共享资源的互斥访问。 Reactor模式是处理并发IO比较常见的模式,用于同步IO,核心思想是将所有要处理的IO事件注册到一个中心IO多路复用器上,同时主线程或进程阻塞在IO多路复用器上;一旦有事件到来或准备就绪,多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。 Reactor的优点: 响应快;不必为单个同步事件阻塞,虽然Reactor本身依然是同步的。 编程相对简单;可以最大程度的避免复杂的多线程及同步问题,尽可能的避免多线程、多进程的切换开销。 可扩展性;可通过增加Reactor实例个数,充分利用CPU资源。 高复用性;Reactor模型本身与事件处理逻辑无关,具有很高的复用性。 三、实现步骤 3.1、step 1:定义Reactor模型相关结构体 reactor数据结构设计图如下: 结构说明:以fd作为索引,存放在block中;当一个fd到来时,通过fd/MAX先找到fd对应的block号,再通过fd%MAX找到对应的偏移地址。例如来了个fd=10000,每个块存放的最大item数量MAX=1024,那么fd对应的block序号等于10000/1024=9;偏移量等于10000%1024=784。这样就可以找到fd对应的数据存放地址item。 数据结构的代码实现如下: struct socket_item { /* data */ int fd; // socket的文件描述符 char *write_buffer; // 写缓冲区 char *read_buffer; // 读缓冲区 int write_length; // 已读字节数 int read_length; // 已写字节数 int status; // 状态标识,设置epfd的操作模式 int event; // 事件类型 void *arg; // 回调函数的参数 int(*callback)(int fd,int events,void* arg); // 回调函数 }; struct event_block { /* data */ struct socket_item *items; // 事件集合 struct event_block *next; // 指向像一个内存块 }; struct net_reactor { /* data */ int epollfd; // 事件块的数量 int block_count; // 事件块的数量 int finish_reactor; // 判断是否退出服务 struct event_block *start_block; // 事件块的起始地址 }; 3.2、step 2:实现Reactor容器初始化功能 我们这里使用epoll作为IO多路复用器。 思路:初始化reactor内存块,避免脏数据;创建events和block并初始化,将events添加到block中,将block添加到reactor的链表中管理。 // 2. int init_reactor(struct net_reactor *reactor) { if(reactor==NULL) return REACTOR_NULL; memset(reactor,0,sizeof(struct net_reactor)); // 创建epoll,作为IO多路复用器 reactor->epollfd=epoll_create(1); if(reactor->epollfd==-1){ printf("create epfd in %s error %s\n", __func__, strerror(errno)); return REACTOR_CREATE_EPOLL_FAILED; } // 创建事件集 struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item)); if(items==NULL) { printf("create socket_item in %s error %s\n", __func__, strerror(errno)); close(reactor->epollfd); return REACTOR_MALLOC_MEMORY_FAILED; } memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item)); // 创建事件内存块 struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block)); if(block==NULL) { printf("create block in %s error %s\n", __func__, strerror(errno)); free(items); close(reactor->epollfd); return REACTOR_MALLOC_MEMORY_FAILED; } memset(block,0,sizeof(struct event_block)); block->items=items; block->next=NULL; reactor->block_count=1; reactor->start_block=block; reactor->finish_reactor=0; return REACTOR_SUCCESS; } 3.3、step 3:实现socket初始化功能 定义成一个函数,方便初始化多个监听端口。 // 3. int init_socket(short port) { int fd=socket(AF_INET,SOCK_STREAM,0); if(fd==-1) { printf("create socket in %s error %s\n", __func__, strerror(errno)); return -1; } int ret; // nonblock int flag = fcntl(fd, F_GETFL, 0); flag |= O_NONBLOCK; ret=fcntl(fd, F_SETFL, flag); if (ret == -1) { printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno)); close(fd); return -1; } // 绑定 struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_addr.s_addr=htonl(INADDR_ANY); server.sin_family=AF_INET; server.sin_port=htons(port); ret=bind(fd,(struct sockaddr*)&server,sizeof(server)); if(ret==-1) { printf("bind() in %s error %s\n", __func__, strerror(errno)); close(fd); return -1; } // 监听 ret=listen(fd,LISTEN_BLK_SIZE); if(ret==-1) { printf("listen failed : %s\n", strerror(errno)); close(fd); return -1; } printf("listen server port : %d\n", port); return fd; } 3.4、step 4:实现Reactor动态扩容功能 为了实现高并发,服务器需要监听多个端口。当高并发时需要reactor容器进行扩容管理。 核心思路:找到链表的末端,分别为events和block分配内存并初始化,将events添加到block中,将block添加到reactor的链表中管理。 // 4. 实现Reactor动态扩容功能 static int reactor_resize(struct net_reactor *reactor) { if(reactor==NULL) return REACTOR_NULL; if(reactor->start_block==NULL) return REACTOR_NULL; // 找到链表末端 struct event_block *cur_block=reactor->start_block; while(cur_block->next!=NULL) { cur_block=cur_block->next; } // 创建事件集 struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item)); if(items==NULL) { printf("create socket_item in %s error %s\n", __func__, strerror(errno)); return REACTOR_MALLOC_MEMORY_FAILED; } memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item)); // 创建事件内存块 struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block)); if(block==NULL) { printf("create block in %s error %s\n", __func__, strerror(errno)); free(items); return REACTOR_MALLOC_MEMORY_FAILED; } memset(block,0,sizeof(struct event_block)); block->next=NULL; block->items=items; cur_block->next=block; reactor->block_count++; return REACTOR_SUCCESS; } 3.5、step 5:实现Reactor索引功能 思路:通过fd/MAX先找到fd对应的block号,再通过fd%MAX找到对应的偏移地址。 例如来了个fd=10000,每个块存放的最大item数量MAX=1024,那么fd对应的block序号等于10000/1024=9;偏移量等于10000%1024=784。这样就可以找到fd对应的数据存放地址item。 // 5. 实现Reactor索引功能 static struct socket_item *reactor_index(struct net_reactor *reactor,int socketfd) { if(reactor==NULL) return NULL; if(reactor->start_block==NULL) return NULL; // fd所在block序号 int block_id=socketfd/MAX_SOCKET_ITEMS; // block序号不存在时自动扩容 while(block_id>=reactor->block_count) { if(reactor_resize(reactor)<0) { printf("reactor_resize in %s error %s\n", __func__, strerror(errno)); return NULL; } } // 找到fd对应block的位置 struct event_block *cur_block=reactor->start_block; int i=0; while(i++ !=block_id && cur_block!=NULL) { cur_block=cur_block->next; } return &cur_block->items[socketfd%MAX_SOCKET_ITEMS]; } 3.6、step 6:实现设置事件信息功能 将事件的相关信息保存到数据结构中。主要实现填充关键信息到event结构体中。 // 6. 实现设置事件信息功能 static void reactor_event_set(int fd,struct socket_item *sockevent,NCALLBACK callback,void *arg) { sockevent->arg=arg; sockevent->callback=callback; sockevent->event=0; sockevent->fd=fd; } 3.7、step 7:实现IO事件监听功能 这里使用epoll作为IO多路复用器,将事件添加到epoll中监听。 思路:主要是epoll_ctl操作,将事件添加到reactor的event结构体中。 // 7. 实现设置IO事件监听功能 static int reactor_event_add(int epollfd,int events,struct socket_item *sockevent) { struct epoll_event ep_events={0,{0}}; ep_events.data.ptr=sockevent; ep_events.events=events; sockevent->event=events; // 判断,设置epfd的操作模式 int options; if(sockevent->status==1) { options=EPOLL_CTL_MOD; } else{ options=EPOLL_CTL_ADD; sockevent->status=1; } if(epoll_ctl(epollfd,options,sockevent->fd,&ep_events)<0) { printf("event add failed [fd=%d], events[%d]\n", sockevent->fd, events); printf("event add failed in %s error %s\n", __func__, strerror(errno)); return -1; } return 0; } 3.8、step 8:实现IO事件移除功能 由于设置了非阻塞模式,当事件到来时,需要暂时移除监听,避免干扰。 // 8. 实现IO事件移除功能 static int reactor_event_del(int epollfd,struct socket_item *sockevent) { if(sockevent->status!=1) return -1; struct epoll_event ep_events={0,{0}}; ep_events.data.ptr=sockevent; sockevent->status=0; // 移除fd的监听 if(epoll_ctl(epollfd, EPOLL_CTL_DEL,sockevent->fd, &ep_events)<0) { printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno)); return -1; } return 0; } 3.9、step 9:实现Reactor事件监听功能 思路:设置fd的事件信息,添加事件到epoll监听。 // 9. 实现Reactor事件监听功能 int reactor_add_listener(struct net_reactor *reactor,int sockfd,NCALLBACK *acceptor) { if(reactor==NULL) return REACTOR_NULL; if(reactor->start_block==NULL) return REACTOR_NULL; // 找到fd对应的event地址 struct socket_item *item=reactor_index(reactor,sockfd); if(item==NULL) { printf("reactor_index failed in %s error %s\n", __func__, strerror(errno)); return REACTOR_ADD_LISTEN_FAILED; } reactor_event_set(sockfd,item,acceptor,reactor); if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0) { return REACTOR_ADD_LISTEN_FAILED; } printf("add listen fd = %d\n",sockfd); return REACTOR_SUCCESS; } 3.10、step 10:实现recv回调函数 思路:找到fd对应的信息内存块;使用recv接收数据;暂时移除该事件的监听;如果接收成功,设置监听事件为是否可写,添加到IO多路复用器(epoll)中;返回收到的数据长度。 // 10:实现recv回调函数 static int callback_recv(int fd, int events, void *arg) { struct net_reactor *reactor=(struct net_reactor *)arg; if(reactor==NULL) return REACTOR_NULL; // 找到fd对应的event地址 struct socket_item *item=reactor_index(reactor,fd); if(item==NULL) { printf("callback_recv in %s error %s\n", __func__, strerror(errno)); return REACTOR_MALLOC_MEMORY_FAILED; } // 接收数据 int ret= recv(fd,item->read_buffer,BUFFER_LENGTH,0); // 暂时移除监听 if(reactor_event_del(reactor->epollfd, item)<0) { printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno)); //return REACTOR_EVENT_DEL_FAILED; } if(ret>0) { item->read_length+=ret; printf("recv [%d]:%s\n", fd, item->read_buffer); // demo memcpy(item->write_buffer,item->read_buffer,ret); item->write_buffer[ret]='\0'; item->write_length=ret; reactor_event_set(fd,item,callback_send,reactor); if(reactor_event_add(reactor->epollfd,EPOLLOUT,item)<0) { printf("reactor_event_add failed in %s error %s\n", __func__, strerror(errno)); //return REACTOR_ADD_LISTEN_FAILED; } } else if(ret==0) { printf("recv_cb --> disconnect\n"); free(item->read_buffer); free(item->write_buffer); close(item->fd); } else { if(errno==EAGAIN || errno==EWOULDBLOCK) { // 表示没有数据可读。这时可以继续等待数据到来,或者关闭套接字。 } else if (errno == ECONNRESET) { // reactor_event_del(reactor->epollfd, item); free(item->read_buffer); free(item->write_buffer); close(item->fd); } printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno)); } return ret; } 3.11、step 11:实现send回调函数 思路:找到fd对应的信息内存块;使用send发送数据;暂时移除该事件的监听;如果发送成功,设置监听事件为是否可读,添加到IO多路复用器(epoll)中;返回发送的数据长度。 // 11:实现send回调函数 static int callback_send(int fd, int events, void *arg) { struct net_reactor *reactor=(struct net_reactor *)arg; if(reactor==NULL) { return REACTOR_NULL; } // 找到fd对应的event地址 struct socket_item *item=reactor_index(reactor,fd); if(item==NULL) { printf("callback_recv in %s error %s\n", __func__, strerror(errno)); return REACTOR_MALLOC_MEMORY_FAILED; } int ret=send(fd,item->write_buffer,item->write_length,0); // 暂时移除监听 if(reactor_event_del(reactor->epollfd, item)<0) { printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno)); //return REACTOR_EVENT_DEL_FAILED; } if (ret > 0) { printf("send[fd=%d], [%d]%s\n", fd, ret, item->write_buffer); reactor_event_set(fd, item, callback_recv, reactor); reactor_event_add(reactor->epollfd, EPOLLIN, item); } else { free(item->read_buffer); free(item->write_buffer); close(fd); printf("send[fd=%d] error %s\n", fd, strerror(errno)); } return ret; } 3.12、step 12:实现accept回调函数 思路:使用accept获得连接的客户端fd;设置客户端fd为非阻塞模式;找到fd对应的信息内存块;设置fd的事件信息;设置监听事件为是否可读,添加到IO多路复用器(epoll)中。 // 12. 实现accept回调函数 int callback_accept(int fd, int events, void *arg) { struct net_reactor *reactor=(struct net_reactor *)arg; if(reactor==NULL) { return REACTOR_NULL; } struct sockaddr_in client; socklen_t len=sizeof(client); int connectfd=accept(fd,(struct sockaddr*)&client,&len); if(connectfd<0) { printf("accept failed in %s error %s\n", __func__, strerror(errno)); return REACTOR_ACCEPT_FAILED; } // 设置非阻塞 int flag = fcntl(connectfd, F_GETFL, 0); flag |= O_NONBLOCK; int ret=fcntl(connectfd, F_SETFL, flag); if (ret == -1) { printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno)); close(connectfd); return REACTOR_FCNTL_FAILED; } // 找到fd对应的event地址 struct socket_item *item=reactor_index(reactor,connectfd); if(item==NULL) { printf("reactor_index in %s error %s\n", __func__, strerror(errno)); close(connectfd); return REACTOR_MALLOC_MEMORY_FAILED; } // 设置fd的事件信息 reactor_event_set(connectfd,item,callback_recv,reactor); // 添加事件到epoll监听 if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0) { close(connectfd); return REACTOR_ADD_LISTEN_FAILED; } // 为fd分配好读写缓冲区 item->read_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char)); if(item->read_buffer==NULL) { printf("mallc in %s error %s\n", __func__, strerror(errno)); close(connectfd); return REACTOR_MALLOC_MEMORY_FAILED; } memset(item->read_buffer,0,BUFFER_LENGTH * sizeof(char)); item->read_length=0; item->write_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char)); if(item->write_buffer==NULL) { printf("mallc in %s error %s\n", __func__, strerror(errno)); close(connectfd); free(item->read_buffer); return REACTOR_MALLOC_MEMORY_FAILED; } memset(item->write_buffer,0,BUFFER_LENGTH * sizeof(char)); item->write_length=0; printf("new connect [%s:%d], pos[%d]\n",inet_ntoa(client.sin_addr), ntohs(client.sin_port), connectfd); return REACTOR_SUCCESS; } 3.13、step 13:实现reactor运行函数 主要是epoll的等待功能,将监听到的事件进行回调处理。 // 13:实现reactor运行函数 int reactor_run(struct net_reactor *reactor) { if(reactor==NULL) return REACTOR_NULL; if(reactor->start_block==NULL) return REACTOR_NULL; if(reactor->epollfd<0) return REACTOR_CREATE_EPOLL_FAILED; struct epoll_event events[MAX_EPOLL_EVENTS + 1]; while(!reactor->finish_reactor) { int nready=epoll_wait(reactor->epollfd,events,MAX_EPOLL_EVENTS,1000); if(nready<0) { printf("epoll wait error\n"); continue; } int i=0; for(i=0;ievent&EPOLLIN)) { // 处理可读事件 item->callback(item->fd, events[i].events, item->arg); } if((events[i].events & EPOLLOUT) && (item->event&EPOLLOUT)) { // 处理可读事件 item->callback(item->fd, events[i].events, item->arg); } } } printf("Clearing memory......\n"); reactor_destory(reactor); printf("finish reactor\n"); return REACTOR_SUCCESS; } 3.14、step 14:实现reactor销毁功能 // 14:实现reactor销毁功能 int reactor_destory(struct net_reactor *reactor) { // 关闭epoll close(reactor->epollfd); struct event_block *blk= reactor->start_block; struct event_block *next; while (blk != NULL) { next = blk->next; // 释放内存块 for(int i=0;iitems[i].read_buffer!=NULL) { free(blk->items[i].read_buffer); blk->items[i].read_buffer=NULL; } if(blk->items[i].write_buffer!=NULL) { free(blk->items[i].write_buffer); blk->items[i].write_buffer=NULL; } } free(blk->items); free(blk); blk = next; } return REACTOR_SUCCESS; } 3.15、使用示例 创建structnet_reactor对象。调用init_reactor初始化。调用init_socket监听端口。调用reactor_add_listener将端口添加到reactor中管理。调用reactor_run运行reactor服务。 #include #include #include #include "reactor.h" #define SERVER_PORT 9703 #define PORT_COUNT 2 int main(int argc,char argv) { struct net_reactor *reactor=(struct net_reactor *)malloc(sizeof(struct net_reactor)); if(reactor==NULL) { perror("malloc struct net_reactor failed!\n"); return REACTOR_MALLOC_MEMORY_FAILED; } if(init_reactor(reactor)<0) { free(reactor); return REACTOR_NULL; } unsigned short port = SERVER_PORT; int sockfds[PORT_COUNT]={0}; int i=0; for(i=0;i0) { if(reactor_add_listener(reactor,sockfds[i],callback_accept)<0) printf("reactor_add_listener failed in %s : %d\n",__func__,__LINE__); } else { printf("init_socket failed in %s : %d\n",__func__,__LINE__); } } reactor_run(reactor); // 销毁 reactor //reactor_destory(reactor); reactor->finish_reactor=1; // 关闭socket集 for(i=0;i #include #include #include #include #include #include #include #include #include #include #include "reactor.h" #define MAX_SOCKET_ITEMS 1024 #define LISTEN_BLK_SIZE 20 #define BUFFER_LENGTH 1024 #define MAX_EPOLL_EVENTS 1024 // 2. int init_reactor(struct net_reactor *reactor) { if(reactor==NULL) return REACTOR_NULL; memset(reactor,0,sizeof(struct net_reactor)); // 创建epoll,作为IO多路复用器 reactor->epollfd=epoll_create(1); if(reactor->epollfd==-1){ printf("create epfd in %s error %s\n", __func__, strerror(errno)); return REACTOR_CREATE_EPOLL_FAILED; } // 创建事件集 struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item)); if(items==NULL) { printf("create socket_item in %s error %s\n", __func__, strerror(errno)); close(reactor->epollfd); return REACTOR_MALLOC_MEMORY_FAILED; } memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item)); // 创建事件内存块 struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block)); if(block==NULL) { printf("create block in %s error %s\n", __func__, strerror(errno)); free(items); close(reactor->epollfd); return REACTOR_MALLOC_MEMORY_FAILED; } memset(block,0,sizeof(struct event_block)); block->items=items; block->next=NULL; reactor->block_count=1; reactor->start_block=block; reactor->finish_reactor=0; return REACTOR_SUCCESS; } // 3. int init_socket(short port) { int fd=socket(AF_INET,SOCK_STREAM,0); if(fd==-1) { printf("create socket in %s error %s\n", __func__, strerror(errno)); return -1; } int ret; // nonblock int flag = fcntl(fd, F_GETFL, 0); flag |= O_NONBLOCK; ret=fcntl(fd, F_SETFL, flag); if (ret == -1) { printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno)); close(fd); return -1; } // 绑定 struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_addr.s_addr=htonl(INADDR_ANY); server.sin_family=AF_INET; server.sin_port=htons(port); ret=bind(fd,(struct sockaddr*)&server,sizeof(server)); if(ret==-1) { printf("bind() in %s error %s\n", __func__, strerror(errno)); close(fd); return -1; } // 监听 ret=listen(fd,LISTEN_BLK_SIZE); if(ret==-1) { printf("listen failed : %s\n", strerror(errno)); close(fd); return -1; } printf("listen server port : %d\n", port); return fd; } // 4. 实现Reactor动态扩容功能 static int reactor_resize(struct net_reactor *reactor) { if(reactor==NULL) return REACTOR_NULL; if(reactor->start_block==NULL) return REACTOR_NULL; // 找到链表末端 struct event_block *cur_block=reactor->start_block; while(cur_block->next!=NULL) { cur_block=cur_block->next; } // 创建事件集 struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item)); if(items==NULL) { printf("create socket_item in %s error %s\n", __func__, strerror(errno)); return REACTOR_MALLOC_MEMORY_FAILED; } memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item)); // 创建事件内存块 struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block)); if(block==NULL) { printf("create block in %s error %s\n", __func__, strerror(errno)); free(items); return REACTOR_MALLOC_MEMORY_FAILED; } memset(block,0,sizeof(struct event_block)); block->next=NULL; block->items=items; cur_block->next=block; reactor->block_count++; return REACTOR_SUCCESS; } // 5. 实现Reactor索引功能 static struct socket_item *reactor_index(struct net_reactor *reactor,int socketfd) { if(reactor==NULL) return NULL; if(reactor->start_block==NULL) return NULL; // fd所在block序号 int block_id=socketfd/MAX_SOCKET_ITEMS; // block序号不存在时自动扩容 while(block_id>=reactor->block_count) { if(reactor_resize(reactor)<0) { printf("reactor_resize in %s error %s\n", __func__, strerror(errno)); return NULL; } } // 找到fd对应block的位置 struct event_block *cur_block=reactor->start_block; int i=0; while(i++ !=block_id && cur_block!=NULL) { cur_block=cur_block->next; } return &cur_block->items[socketfd%MAX_SOCKET_ITEMS]; } // 6. 实现设置事件信息功能 static void reactor_event_set(int fd,struct socket_item *sockevent,NCALLBACK callback,void *arg) { sockevent->arg=arg; sockevent->callback=callback; sockevent->event=0; sockevent->fd=fd; } // 7. 实现设置IO事件监听功能 static int reactor_event_add(int epollfd,int events,struct socket_item *sockevent) { struct epoll_event ep_events={0,{0}}; ep_events.data.ptr=sockevent; ep_events.events=events; sockevent->event=events; // 判断,设置epfd的操作模式 int options; if(sockevent->status==1) { options=EPOLL_CTL_MOD; } else{ options=EPOLL_CTL_ADD; sockevent->status=1; } if(epoll_ctl(epollfd,options,sockevent->fd,&ep_events)<0) { printf("event add failed [fd=%d], events[%d]\n", sockevent->fd, events); printf("event add failed in %s error %s\n", __func__, strerror(errno)); return -1; } return 0; } // 8. 实现IO事件移除功能 static int reactor_event_del(int epollfd,struct socket_item *sockevent) { if(sockevent->status!=1) return -1; struct epoll_event ep_events={0,{0}}; ep_events.data.ptr=sockevent; sockevent->status=0; // 移除fd的监听 if(epoll_ctl(epollfd, EPOLL_CTL_DEL,sockevent->fd, &ep_events)<0) { printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno)); return -1; } return 0; } // 9. 实现Reactor事件监听功能 int reactor_add_listener(struct net_reactor *reactor,int sockfd,NCALLBACK *acceptor) { if(reactor==NULL) return REACTOR_NULL; if(reactor->start_block==NULL) return REACTOR_NULL; // 找到fd对应的event地址 struct socket_item *item=reactor_index(reactor,sockfd); if(item==NULL) { printf("reactor_index failed in %s error %s\n", __func__, strerror(errno)); return REACTOR_ADD_LISTEN_FAILED; } reactor_event_set(sockfd,item,acceptor,reactor); if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0) { return REACTOR_ADD_LISTEN_FAILED; } printf("add listen fd = %d\n",sockfd); return REACTOR_SUCCESS; } // 10:实现recv回调函数 static int callback_recv(int fd, int events, void *arg) { struct net_reactor *reactor=(struct net_reactor *)arg; if(reactor==NULL) return REACTOR_NULL; // 找到fd对应的event地址 struct socket_item *item=reactor_index(reactor,fd); if(item==NULL) { printf("callback_recv in %s error %s\n", __func__, strerror(errno)); return REACTOR_MALLOC_MEMORY_FAILED; } // 接收数据 int ret= recv(fd,item->read_buffer,BUFFER_LENGTH,0); // 暂时移除监听 if(reactor_event_del(reactor->epollfd, item)<0) { printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno)); //return REACTOR_EVENT_DEL_FAILED; } if(ret>0) { item->read_length+=ret; printf("recv [%d]:%s\n", fd, item->read_buffer); // demo memcpy(item->write_buffer,item->read_buffer,ret); item->write_buffer[ret]='\0'; item->write_length=ret; reactor_event_set(fd,item,callback_send,reactor); if(reactor_event_add(reactor->epollfd,EPOLLOUT,item)<0) { printf("reactor_event_add failed in %s error %s\n", __func__, strerror(errno)); //return REACTOR_ADD_LISTEN_FAILED; } } else if(ret==0) { printf("recv_cb --> disconnect\n"); free(item->read_buffer); free(item->write_buffer); close(item->fd); } else { if(errno==EAGAIN || errno==EWOULDBLOCK) { // 表示没有数据可读。这时可以继续等待数据到来,或者关闭套接字。 } else if (errno == ECONNRESET) { // reactor_event_del(reactor->epollfd, item); free(item->read_buffer); free(item->write_buffer); close(item->fd); } printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno)); } return ret; } // 11:实现send回调函数 static int callback_send(int fd, int events, void *arg) { struct net_reactor *reactor=(struct net_reactor *)arg; if(reactor==NULL) { return REACTOR_NULL; } // 找到fd对应的event地址 struct socket_item *item=reactor_index(reactor,fd); if(item==NULL) { printf("callback_recv in %s error %s\n", __func__, strerror(errno)); return REACTOR_MALLOC_MEMORY_FAILED; } int ret=send(fd,item->write_buffer,item->write_length,0); // 暂时移除监听 if(reactor_event_del(reactor->epollfd, item)<0) { printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno)); //return REACTOR_EVENT_DEL_FAILED; } if (ret > 0) { printf("send[fd=%d], [%d]%s\n", fd, ret, item->write_buffer); reactor_event_set(fd, item, callback_recv, reactor); reactor_event_add(reactor->epollfd, EPOLLIN, item); } else { free(item->read_buffer); free(item->write_buffer); close(fd); printf("send[fd=%d] error %s\n", fd, strerror(errno)); } return ret; } // 12. 实现accept回调函数 int callback_accept(int fd, int events, void *arg) { struct net_reactor *reactor=(struct net_reactor *)arg; if(reactor==NULL) { return REACTOR_NULL; } struct sockaddr_in client; socklen_t len=sizeof(client); int connectfd=accept(fd,(struct sockaddr*)&client,&len); if(connectfd<0) { printf("accept failed in %s error %s\n", __func__, strerror(errno)); return REACTOR_ACCEPT_FAILED; } // 设置非阻塞 int flag = fcntl(connectfd, F_GETFL, 0); flag |= O_NONBLOCK; int ret=fcntl(connectfd, F_SETFL, flag); if (ret == -1) { printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno)); close(connectfd); return REACTOR_FCNTL_FAILED; } // 找到fd对应的event地址 struct socket_item *item=reactor_index(reactor,connectfd); if(item==NULL) { printf("reactor_index in %s error %s\n", __func__, strerror(errno)); close(connectfd); return REACTOR_MALLOC_MEMORY_FAILED; } // 设置fd的事件信息 reactor_event_set(connectfd,item,callback_recv,reactor); // 添加事件到epoll监听 if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0) { close(connectfd); return REACTOR_ADD_LISTEN_FAILED; } // 为fd分配好读写缓冲区 item->read_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char)); if(item->read_buffer==NULL) { printf("mallc in %s error %s\n", __func__, strerror(errno)); close(connectfd); return REACTOR_MALLOC_MEMORY_FAILED; } memset(item->read_buffer,0,BUFFER_LENGTH * sizeof(char)); item->read_length=0; item->write_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char)); if(item->write_buffer==NULL) { printf("mallc in %s error %s\n", __func__, strerror(errno)); close(connectfd); free(item->read_buffer); return REACTOR_MALLOC_MEMORY_FAILED; } memset(item->write_buffer,0,BUFFER_LENGTH * sizeof(char)); item->write_length=0; printf("new connect [%s:%d], pos[%d]\n",inet_ntoa(client.sin_addr), ntohs(client.sin_port), connectfd); return REACTOR_SUCCESS; } // 13:实现reactor运行函数 int reactor_run(struct net_reactor *reactor) { if(reactor==NULL) return REACTOR_NULL; if(reactor->start_block==NULL) return REACTOR_NULL; if(reactor->epollfd<0) return REACTOR_CREATE_EPOLL_FAILED; struct epoll_event events[MAX_EPOLL_EVENTS + 1]; while(!reactor->finish_reactor) { int nready=epoll_wait(reactor->epollfd,events,MAX_EPOLL_EVENTS,1000); if(nready<0) { printf("epoll wait error\n"); continue; } int i=0; for(i=0;ievent&EPOLLIN)) { // 处理可读事件 item->callback(item->fd, events[i].events, item->arg); } if((events[i].events & EPOLLOUT) && (item->event&EPOLLOUT)) { // 处理可读事件 item->callback(item->fd, events[i].events, item->arg); } } } printf("Clearing memory......\n"); reactor_destory(reactor); printf("finish reactor\n"); return REACTOR_SUCCESS; } // 14:实现reactor销毁功能 int reactor_destory(struct net_reactor *reactor) { // 关闭epoll close(reactor->epollfd); struct event_block *blk= reactor->start_block; struct event_block *next; while (blk != NULL) { next = blk->next; // 释放内存块 for(int i=0;iitems[i].read_buffer!=NULL) { free(blk->items[i].read_buffer); blk->items[i].read_buffer=NULL; } if(blk->items[i].write_buffer!=NULL) { free(blk->items[i].write_buffer); blk->items[i].write_buffer=NULL; } } free(blk->items); free(blk); blk = next; } return REACTOR_SUCCESS; } main.c #include #include #include #include "reactor.h" #define SERVER_PORT 9703 #define PORT_COUNT 2 int main(int argc,char argv) { struct net_reactor *reactor=(struct net_reactor *)malloc(sizeof(struct net_reactor)); if(reactor==NULL) { perror("malloc struct net_reactor failed!\n"); return REACTOR_MALLOC_MEMORY_FAILED; } if(init_reactor(reactor)<0) { free(reactor); return REACTOR_NULL; } unsigned short port = SERVER_PORT; int sockfds[PORT_COUNT]={0}; int i=0; for(i=0;i0) { if(reactor_add_listener(reactor,sockfds[i],callback_accept)<0) printf("reactor_add_listener failed in %s : %d\n",__func__,__LINE__); } else { printf("init_socket failed in %s : %d\n",__func__,__LINE__); } } reactor_run(reactor); // 销毁 reactor //reactor_destory(reactor); reactor->finish_reactor=1; // 关闭socket集 for(i=0;i hard nofile 1024 <用户名> soft nofile 1024 另外,为了体现reactor的性能,需要将一些没必要的打印关掉,因为打印会影响性能。 客户端测试脚本代码: #include #include #include #include #include #include #include #include #include #include #include #include #define MAX_BUFFER 128 #define MAX_EPOLLSIZE (384*1024) #define MAX_PORT 1 #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) int isContinue = 0; static int ntySetNonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL, 0); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } static int ntySetReUseAddr(int fd) { int reuse = 1; return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)); } int main(int argc, char argv) { if (argc <= 2) { printf("Usage: %s ip port\n", argv[0]); exit(0); } const char *ip = argv[1]; int port = atoi(argv[2]); int connections = 0; char buffer[128] = {0}; int i = 0, index = 0; struct epoll_event events[MAX_EPOLLSIZE]; int epoll_fd = epoll_create(MAX_EPOLLSIZE); strcpy(buffer, " Data From MulClient\n"); struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(ip); struct timeval tv_begin; gettimeofday(&tv_begin, NULL); while (1) { if (++index >= MAX_PORT) index = 0; struct epoll_event ev; int sockfd = 0; if (connections < 340000 && !isContinue) { sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd == -1) { perror("socket"); goto err; } //ntySetReUseAddr(sockfd); addr.sin_port = htons(port+index); if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) { perror("connect"); goto err; } ntySetNonblock(sockfd); ntySetReUseAddr(sockfd); sprintf(buffer, "Hello Server: client --> %d\n", connections); send(sockfd, buffer, strlen(buffer), 0); ev.data.fd = sockfd; ev.events = EPOLLIN | EPOLLOUT; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev); connections ++; } //connections ++; if (connections % 1000 == 999 || connections >= 340000) { struct timeval tv_cur; memcpy(&tv_cur, &tv_begin, sizeof(struct timeval)); gettimeofday(&tv_begin, NULL); int time_used = TIME_SUB_MS(tv_begin, tv_cur); printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used); int nfds = epoll_wait(epoll_fd, events, connections, 100); for (i = 0;i < nfds;i ++) { int clientfd = events[i].data.fd; if (events[i].events & EPOLLOUT) { sprintf(buffer, "data from %d\n", clientfd); send(sockfd, buffer, strlen(buffer), 0); } else if (events[i].events & EPOLLIN) { char rBuffer[MAX_BUFFER] = {0}; ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0); if (length > 0) { printf(" RecvBuffer:%s\n", rBuffer); if (!strcmp(rBuffer, "quit")) { isContinue = 0; } } else if (length == 0) { printf(" Disconnect clientfd:%d\n", clientfd); connections --; close(clientfd); } else { if (errno == EINTR) continue; printf(" Error clientfd:%d, errno:%d\n", clientfd, errno); close(clientfd); } } else { printf(" clientfd:%d, errno:%d\n", clientfd, errno); close(clientfd); } } } usleep(500); } return 0; err: printf("error : %s\n", strerror(errno)); return 0; } 小结 至此,我们最终实现了支持高并发的服务器程序,但是,这个服务器程序有些局限性,我们还要继续改善、优化。在改进之前,需要开发一个后台日志模块,这是服务器程序必须的,所有,下一个章节将开发一个高效的后台日志模块。
香港云服务器租用推荐
服务器租用资讯
·广东云服务有限公司怎么样
·广东云服务器怎么样
·广东锐讯网络有限公司怎么样
·广东佛山的蜗牛怎么那么大
·广东单位电话主机号怎么填写
·管家婆 花生壳怎么用
·官网域名过期要怎么办
·官网邮箱一般怎么命名
·官网网站被篡改怎么办
服务器租用推荐
·美国服务器租用
·台湾服务器租用
·香港云服务器租用
·香港裸金属服务器
·香港高防服务器租用
·香港服务器租用特价