帮助文档
专业提供香港服务器、香港云服务器、香港高防服务器租用、香港云主机、台湾服务器、美国服务器、美国云服务器vps租用、韩国高防服务器租用、新加坡服务器、日本服务器租用 一站式全球网络解决方案提供商!专业运营维护IDC数据中心,提供高质量的服务器托管,服务器机房租用,服务器机柜租用,IDC机房机柜租用等服务,稳定、安全、高性能的云端计算服务,实时满足您的多样性业务需求。 香港大带宽稳定可靠,高级工程师提供基于服务器硬件、操作系统、网络、应用环境、安全的免费技术支持。
服务器资讯 / 香港服务器租用 / 香港VPS租用 / 香港云服务器 / 美国服务器租用 / 台湾服务器租用 / 日本服务器租用 / 官方公告 / 帮助文档
[学习分享]----sylar服务器框架源码阅读--IO协程调度模块
发布时间:2024-03-05 09:28:10   分类:帮助文档
[学习分享]----sylar服务器框架源码阅读--IO协程调度模块 sylar作者在本站的地址为 这里,也可以查看 作者主页,也有视频教程可以 点击这里。此外,也可以看一下赵路强大佬的关于sylar协程模块的博客 点击这里,我本人在阅读sylar源码的时候也是参考了赵路强大佬的解析 可以点击这里。 各位看官也可以加我的qq和我讨论2511010742 文章目录 IO 协程调度epoll简介1. epoll API2. epoll 结构 源码分析 IO 协程调度 sylar大人在协程调度模块中封装了epoll,对于每一个需要监听的文件描述符fd,都支持可读和可写事件。这部分操作是十分复杂的,需要读者对协程调度模块和epoll模型十分了解,接下来我会尽我所能向大家介绍清楚这部分内容。在协程调度模块中,当没有任务执行时就会在idle状态下忙等待,在本节中就利用了idle状态去做一些需要阻塞等待的任务,比如epoll_wait等。下面,我先介绍一下epoll的相关信息。 epoll简介 epoll 是 Linux 系统提供的一种事件通知机制,主要用于处理大量文件描述符的 I/O 事件。它是一种高效的 I/O 多路复用机制,相比于传统的 select 和 poll,epoll 在处理大量连接时有更好的性能。 1. epoll API 1.1 创建 epoll 实例 int epfd = epoll_create(int size); 这个函数用处是创建一个 epoll 实例,size 本意为希望监听的文件描述符的数量,不用太在意它。 1.2 控制 epoll 上的事件 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); epfd 是 epoll 实例的文件描述符。op 表示操作类型,有三个值: EPOLL_CTL_ADD:添加一个文件描述符到 epoll 实例中。 EPOLL_CTL_MOD:修改 epoll 实例上的文件描述符的事件。 EPOLL_CTL_DEL:从 epoll 实例中删除一个文件描述符。fd 是要操作的文件描述符。event 是一个 struct epoll_event 结构体,用于描述事件类型和携带的数据。 1.3 等待事件发生 int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); epfd 是 epoll 实例的文件描述符。events 是一个数组,用于存储发生的事件。maxevents 表示 events 数组的大小,即最多可以存储多少个事件。timeout 表示等待事件发生的超时时间,单位是毫秒。传入 -1 表示一直等待,传入 0 表示立即返回,大于 0 表示等待指定的时间。 2. epoll 结构 2.1 eventpoll epoll 在内核中主要使用两个结构体:struct eventpoll 和 struct epitem。这两个结构体用于管理 epoll 实例中的文件描述符和事件。eventpoll 里成员有很多,我们讲一些常用的。 struct eventpoll { spinlock_t lock; struct mutex mtx; struct file *file; struct wait_queue_head *wq; struct wait_queue_entry wait; wait_queue_t poll_wait; struct list_head rdllist; struct list_head ovflist; struct rb_root_cached rbr; struct list_head poll_siblings; struct list_head fasync_list; unsigned int ovflist_length; struct ep_pqueue poll_table; struct user_struct *user; unsigned long gen; wait_queue_head_t proc_wait; struct wakeup_source __rcu *ws; unsigned int user_size; int wakeup_pipe; }; 主要字段包括: lock: 用于实现对 eventpoll 结构的自旋锁,确保多线程并发安全。mtx: 用于实现互斥锁,确保对 eventpoll 结构的互斥访问。file: 与 epoll 实例关联的文件结构。wq: 等待队列头,用于进程等待事件发生。poll_table: 保存文件描述符到 epitem 的映射表,是一个指针数组。rdllist: 就绪链表,用于存储就绪的事件项 (epitem)。rbr: 红黑树根节点,用于快速查找文件描述符对应的 epitem。其他字段用于管理等待队列、异步事件、用户信息等。 2.2 epitem struct epitem 表示 epoll 实例中的一个事件项,它包含了与文件描述符相关的信息和事件状态。 struct epitem { struct rb_node rbn; struct list_head rdllink; struct list_head fllink; struct file *file; struct eventpoll *ep; struct wait_queue_head wq; struct epoll_event event; unsigned long last_wakeup; spinlock_t lock; }; 主要字段包括: rbn: 用于将 epitem 插入到 struct eventpoll 的红黑树中,以实现快速查找。rdllink: 用于将 epitem 插入到 struct eventpoll 的就绪链表中,以等待用户程序处理。fllink: 用于将 epitem 插入到 struct eventpoll 的就绪链表中的尾部。file: 关联的文件结构。ep: 指向 struct eventpoll 的指针。wq: 等待队列头,用于进程等待事件发生。event: 存储文件描述符的事件类型和数据。last_wakeup: 记录上次唤醒的时间戳,用于避免多次重复唤醒。lock: 用于实现对 epitem 结构的自旋锁,确保多线程并发安全。 这两个结构体是 epoll 内核实现中的核心数据结构,通过它们实现了高效的事件管理和处理机制。struct eventpoll 用于维护 epoll 实例的状态,而 struct epitem 用于表示每个文件描述符的事件状态。 epoll_event 结构体用于描述文件描述符上的事件,它是在 epoll 操作中用到的关键数据结构。以下是 epoll_event 结构体的定义: struct epoll_event { uint32_t events; // 事件类型,可以是 EPOLLIN、EPOLLOUT、EPOLLERR 等 epoll_data_t data; // 用户数据,可以携带额外信息 }; events:表示事件的类型,可以是以下几个宏的组合: EPOLLIN:表示对应的文件描述符可以读取(Readable)。EPOLLOUT:表示对应的文件描述符可以写入(Writable)。EPOLLRDHUP:表示对端断开连接,或者是写半关闭状态(可读、对端关闭写)。EPOLLHUP:表示发生挂起事件,通常表示连接被挂断或发生错误。EPOLLERR:表示发生错误,需要通过 errno 获取具体错误信息。 data:是一个联合体,用于携带额外的信息。epoll_data_t 的定义如下: typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; epoll_data 可以用于携带用户数据,具体使用取决于应用的需要。如果用户程序希望在事件发生时携带一些额外的信息,可以通过设置 data 字段。sylar就使用了其中的 ptr 指针来保存事件上下文信息。 首先,通过 epoll_create 创建一个 epoll 实例,得到一个文件描述符,然后,通过 epoll_ctl 将需要监听的文件描述符注册到 epoll 实例中,并指定关注的事件类型。这个过程会将我们添加的文件描述符添加到 epoll 句柄的红黑树中,然后执行 epoll_wait 函数则会阻塞等待这些文件描述符所关心的事件发生,当有关注的事件发生时,epoll_wait 返回,需要处理的事件都会保存在 events 数组里,遍历并处理这些就绪的事件。 为了高效地管理已经就绪的事件项,epoll 使用双向链表维护一个就绪事件链表。当文件描述符上的事件发生时,对应的事件项会被从红黑树中找到,然后加入到就绪链表中等待用户程序处理。用户程序调用 epoll_wait 进行阻塞等待事件的发生。内核检查红黑树上的文件描述符,如果有就绪的文件描述符,将相应的事件项加入到就绪链表中。epoll_wait 返回时,将就绪链表上的事件项传递给用户程序,用户程序可以遍历链表处理已经就绪的事件。 简单介绍完之后,让我们进入今天的主题,读一下 sylar 这部分源码。 源码分析 对于IO协程调度来说,我们只关心它的 fd、事件类型和对应的回调函数,其中 epoll 负责 fd 和事件之间的联系,回调函数则是需要协程调度去调度执行。sylar 建立了一个 FdContext 结构来保存对应的文件描述符,以及事件信息和回调函数。并将其保存在epoll_event的私有数据指针data.ptr中,当 wait 之后,遍历就绪事件就可以拿到这些信息,从而去执行相应的操作。 调度器在 idle 时会 epoll_wait 所有已经注册的 fd,返回时就可以拿到对应的 FdContext。然后将对应事件的回调函数加入到调度器的任务列表中,在 idle 退出后就可以调度这些回调函数了。下面我们看一下头文件信息。 class IOManager : public Scheduler, public TimerManager { public: typedef std::shared_ptr ptr; typedef RWMutex RWMutexType; // IO事件 enum Event { // 无事件 NONE = 0x0, // 读事件 READ = 0x1, // 写事件 WRITE = 0x4, }; private: // Socket事件上下文类 struct FdContext { typedef Mutex MutexType; // 事件上下文类 struct EventContext { // 这个事件执行的调度器 Scheduler* scheduler = nullptr; // 事件协程 Fiber::ptr fiber; // 事件回调函数 std::function cb; }; // 获取事件上下文类 EventContext& getContext(Event event); // 重新设置事件上下文 void resetContext(EventContext& ctx); // 触发事件 void triggerEvent(Event event); // 定义一读一写事件 EventContext read, write; // 事件关联的句柄 int fd = 0; // 当前描述符所关注事件 Event events = NONE; // 事件的锁 MutexType mutex; }; public: IOManager(size_t threads = 1, bool use_caller = true, const std::string& name = ""); ~IOManager(); / * @brief 添加事件 * @param[in] fd socket句柄 * @param[in] event 事件类型 * @param[in] cb 事件回调函数 * @return 添加成功返回0,失败返回-1 */ int addEvent(int fd, Event event, std::function cb = nullptr); / * @brief 删除事件 * @param[in] fd socket句柄 * @param[in] event 事件类型 * @attention 不会触发事件 */ bool delEvent(int fd, Event event); // 取消 fd上的某个事件 bool cancelEvent(int fd, Event event); // 取消 fd上的所有事件 bool cancelAll(int fd); // 返回当前IOManager static IOManager* GetThis(); protected: void tickle() override; bool stopping() override; void idle() override; void onTimerInsertedAtFront() override; void contextResize(size_t size); / * @brief 判断是否可以停止 * @param[out] timeout 最近要出发的定时器事件间隔 * @return 返回是否可以停止 */ bool stopping(uint64_t& timeout); private: // epoll文件句柄 int m_epfd = 0; // pipe 句柄 int m_tickleFds[2]; // 当前等待执行的事件总数 std::atomic m_pendingEventCount = {0}; // IOManager的锁 RWMutexType m_mutex; // socket事件上下文的容器 std::vector m_fdContexts; }; 可以看到,这里对于每一个文件描述符都关联了相关的事件,以及回调。换句话说就是每一个文件描述符 fd 都可以关注三种事件类型,读写以及无事件,每种事件都可以注册一个专属的回调函数,并且还支持删除和取消事件。 下面来阅读一下 sylar 关于这部分的函数定义,我们先来看一下主体部分。 IOManager::IOManager(size_t threads, bool use_caller, const std::string& name) :Scheduler(threads, use_caller, name) { std::cout << "[DEBUG] IOManager create" << std::endl; // 创建 epoll实例 m_epfd = epoll_create(5000); //SYLAR_ASSERT(m_epfd > 0); // 初始化管道 /* 这里 sylar使用管道提醒其他线程有任务来了 */ int rt = pipe(m_tickleFds); //SYLAR_ASSERT(!rt); // 为管道读端注册可读事件 epoll_event event; memset(&event, 0, sizeof(epoll_event)); // 监听事件,边沿触发 event.events = EPOLLIN | EPOLLET; event.data.fd = m_tickleFds[0]; rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK); //SYLAR_ASSERT(!rt); rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event); //SYLAR_ASSERT(!rt); contextResize(32); // 直接开启调度 start(); } sylar 使用管道来通知有任务要调度 void IOManager::tickle() { std::cout << "[INFO] IOManager::tickle" << std::endl; if(!hasIdleThreads()) { return; } // 向管道写段写入数据 int rt = write(m_tickleFds[1], "T", 1); //SYLAR_ASSERT(rt == 1); } 接下来是 idle 协程的重写了 void IOManager::idle() { std::cout << "[DEBUG] IOManager::idle" << std::endl; const uint64_t MAX_EVENTS = 256; // 动态分配一个包含MAX_EVENTS个epoll_event的数组,并初始化为零 epoll_event* events = new epoll_event[MAX_EVENTS](); // 使用std::shared_ptr来管理这个动态分配的数组的内存 std::shared_ptr shared_events(events, [](epoll_event* ptr){ // lambda表达式,用于在shared_ptr释放内存时调用 // 删除动态分配的epoll_event数组 delete[] ptr; }); while(true) { uint64_t next_timeout = 0; if(stopping(next_timeout)) { break; } int rt = 0; do { static const int MAX_TIMEOUT = 3000; if(next_timeout != ~0ull) { next_timeout = (int)next_timeout > MAX_TIMEOUT ? MAX_TIMEOUT : next_timeout; } else { next_timeout = MAX_TIMEOUT; } rt = epoll_wait(m_epfd, events, MAX_EVENTS, (int)next_timeout); if(rt < 0 && errno == EINTR) { // 在中断时不执行任何操作 } else { break; } } while(true); std::vector > cbs; // 填充过期的回调 listExpiredCb(cbs); if(!cbs.empty()) { schedule(cbs.begin(), cbs.end()); cbs.clear(); } // 上面这一部分关于定时器的内容目前不必理会 for(int i = 0; i < rt; ++i) { epoll_event& event = events[i]; if(event.data.fd == m_tickleFds[0]) { uint8_t dummy[256]; while(read(m_tickleFds[0], dummy, sizeof(dummy)) > 0); continue; // 处理一个特殊情况:如果事件对应于tickle管道的读取端 (m_tickleFds),则从中读取以清空管道。 } // 拿到对应 fd的上下文信息 FdContext* fd_ctx = (FdContext*)event.data.ptr; FdContext::MutexType::Lock lock(fd_ctx->mutex); if(event.events & (EPOLLERR | EPOLLHUP)) { // 如果事件是EPOLLERR | EPOLLHUP,就更新事件类型信息 event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events; } // 定义真实发生的事件 int real_events = NONE; if(event.events & EPOLLIN) { real_events |= READ; } if(event.events & EPOLLOUT) { real_events |= WRITE; } // 以上,确定实际发生的事件 if((fd_ctx->events & real_events) == NONE) { continue; } /* 下面这一行首先计算剩余的事件。fd_ctx->events 表示原始的关注事件集合, 而 real_events 表示实际发生的事件集合。 ~real_events 对 real_events 取反,即将所有位取反,然后使用按位与操作 &, 将原始关注事件集合中对应实际发生事件的位清零。 这样就得到了剩余的未发生事件的集合,存储在 left_events 变量中。 */ int left_events = (fd_ctx->events & ~real_events); int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; event.events = EPOLLET | left_events; /* 接下来,根据剩余的未发生事件集合 left_events 的情况, 确定是要修改(EPOLL_CTL_MOD)还是删除(EPOLL_CTL_DEL)事件。 如果 left_events 不为零,表示仍然有关注的事件,那么操作类型 op 就是 EPOLL_CTL_MOD; 否则,操作类型 op 就是 EPOLL_CTL_DEL。 最后,将 event.events 设置为 EPOLLET | left_events, 将事件设置为边缘触发模式(EPOLLET)以及剩余的未发生事件, 用于后续调用 epoll_ctl 进行修改或删除监听。 */ int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event); if(rt2) { std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd_ctx->fd << ", " << (EPOLL_EVENTS)event.events << "):" << rt2 << " (" << errno << ") (" << strerror(errno) << ")"; continue; } if(real_events & READ) { fd_ctx->triggerEvent(READ); --m_pendingEventCount; } if(real_events & WRITE) { fd_ctx->triggerEvent(WRITE); --m_pendingEventCount; } // 触发相关事件 } // 退出 idle协程 Fiber::ptr cur = Fiber::GetThis(); auto raw_ptr = cur.get(); cur.reset(); raw_ptr->swapOut(); //raw_ptr->back(); } std::cout << "[DEBUG] IOManager::idle over" << std::endl; } 接下来就是剩下的部分了,我会给出全部的代码,并尽我所能给出详细的注释。 // 获得当前事件上下文 IOManager::FdContext::EventContext& IOManager::FdContext::getContext(IOManager::Event event) { switch(event) { case IOManager::READ: return read; case IOManager::WRITE: return write; default: //SYLAR_ASSERT2(false, "getContext"); ; } // 一般是使用无效参数调用函数抛出异常 throw std::invalid_argument("getContext invalid event"); } // 重置事件上下文信息 void IOManager::FdContext::resetContext(EventContext& ctx) { ctx.scheduler = nullptr; ctx.fiber.reset(); ctx.cb = nullptr; } // 触发这些事件的回调函数 void IOManager::FdContext::triggerEvent(IOManager::Event event) { //SYLAR_ASSERT(events & event); events = (Event)(events & ~event); EventContext& ctx = getContext(event); // 将对应的回调或协程加入调度器 if(ctx.cb) { ctx.scheduler->schedule(&ctx.cb); } else { ctx.scheduler->schedule(&ctx.fiber); } ctx.scheduler = nullptr; return; } IOManager::IOManager(size_t threads, bool use_caller, const std::string& name) :Scheduler(threads, use_caller, name) { std::cout << "[DEBUG] IOManager create" << std::endl; m_epfd = epoll_create(5000); //SYLAR_ASSERT(m_epfd > 0); int rt = pipe(m_tickleFds); //SYLAR_ASSERT(!rt); epoll_event event; memset(&event, 0, sizeof(epoll_event)); // 监听事件,边沿触发 event.events = EPOLLIN | EPOLLET; event.data.fd = m_tickleFds[0]; rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK); //SYLAR_ASSERT(!rt); rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event); //SYLAR_ASSERT(!rt); contextResize(32); start(); } IOManager::~IOManager() { std::cout << "[INFO] ~IOManager" << std::endl; stop(); close(m_epfd); close(m_tickleFds[0]); close(m_tickleFds[1]); for(size_t i = 0; i < m_fdContexts.size(); ++i) { if(m_fdContexts[i]) { delete m_fdContexts[i]; } } } // 重置 fd上下文容器 void IOManager::contextResize(size_t size) { m_fdContexts.resize(size); for(size_t i = 0; i < m_fdContexts.size(); ++i) { if(!m_fdContexts[i]) { m_fdContexts[i] = new FdContext; m_fdContexts[i]->fd = i; } } } // 向某个文件描述符上添加事件 int IOManager::addEvent(int fd, Event event, std::function cb) { FdContext* fd_ctx = nullptr; RWMutexType::ReadLock lock(m_mutex); // 如果 m_fdContexts大小不够的话,就扩容 if((int)m_fdContexts.size() > fd) { fd_ctx = m_fdContexts[fd]; lock.unlock(); } else { lock.unlock(); RWMutexType::WriteLock lock2(m_mutex); contextResize(fd * 1.5); fd_ctx = m_fdContexts[fd]; } FdContext::MutexType::Lock lock2(fd_ctx->mutex); // 确定是添加事件(EPOLL_CTL_ADD)还是修改事件(EPOLL_CTL_MOD)。 // 如果 fd_ctx->events 不为零,说明文件描述符已经关注了其他事件,需要进行修改。 int op = fd_ctx->events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; epoll_event epevent; // 边缘触发 + 原有的事件 + 新加入的事件 epevent.events = EPOLLET | fd_ctx->events | event; // data.ptr 设置为 fd_ctx,用于在事件发生时追溯到对应的文件描述符上下文。 epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if(rt) { std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ") fd_ctx->events=" << (EPOLL_EVENTS)fd_ctx->events; return -1; } // 更新 IOManager 中待处理事件的计数,并更新文件描述符上下文的关注事件集合。 ++m_pendingEventCount; fd_ctx->events = (Event)(fd_ctx->events | event); FdContext::EventContext& event_ctx = fd_ctx->getContext(event); // 设置调度器 event_ctx.scheduler = Scheduler::GetThis(); if(cb) { event_ctx.cb.swap(cb); } else { event_ctx.fiber = Fiber::GetThis(); } return 0; } // 删除事件 bool IOManager::delEvent(int fd, Event event) { RWMutexType::ReadLock lock(m_mutex); if((int)m_fdContexts.size() <= fd) { return false; } FdContext* fd_ctx = m_fdContexts[fd]; lock.unlock(); FdContext::MutexType::Lock lock2(fd_ctx->mutex); if(!(fd_ctx->events & event)) { return false; } // 计算新的关注事件集合,通过将指定事件从原有集合中去除。 Event new_events = (Event)(fd_ctx->events & ~event); int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; epoll_event epevent; epevent.events = EPOLLET | new_events; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if(rt) { std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ")"; return false; } --m_pendingEventCount; fd_ctx->events = new_events; FdContext::EventContext& event_ctx = fd_ctx->getContext(event); fd_ctx->resetContext(event_ctx); // 获取特定事件类型的上下文信息,并重置该上下文信息。重置的目的是清除可能存在的回调函数和协程等相关信息。 return true; } // 取消 fd的某个事件 bool IOManager::cancelEvent(int fd, Event event) { RWMutexType::ReadLock lock(m_mutex); if((int)m_fdContexts.size() <= fd) { return false; } FdContext* fd_ctx = m_fdContexts[fd]; lock.unlock(); FdContext::MutexType::Lock lock2(fd_ctx->mutex); if(!(fd_ctx->events & event)) { return false; } Event new_events = (Event)(fd_ctx->events & ~event); int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; epoll_event epevent; epevent.events = EPOLLET | new_events; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if(rt) { std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ")"; return false; } // 触发事件,执行相应的回调函数或唤醒关联的协程。 fd_ctx->triggerEvent(event); --m_pendingEventCount; return true; } // 取消某个描述符上的所有事件 bool IOManager::cancelAll(int fd) { RWMutexType::ReadLock lock(m_mutex); if((int)m_fdContexts.size() <= fd) { return false; } FdContext* fd_ctx = m_fdContexts[fd]; lock.unlock(); FdContext::MutexType::Lock lock2(fd_ctx->mutex); if(!(fd_ctx->events)) { return false; } int op = EPOLL_CTL_DEL; epoll_event epevent; epevent.events = 0; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if(rt) { std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ")"; return false; } if(fd_ctx->events & READ) { fd_ctx->triggerEvent(READ); --m_pendingEventCount; } if(fd_ctx->events & WRITE) { fd_ctx->triggerEvent(WRITE); --m_pendingEventCount; } // 如果文件描述符上下文关注了读或写事件,则触发相应的事件,并更新 IOManager 中待处理事件的计数。 //SYLAR_ASSERT(fd_ctx->events == 0); return true; } IOManager* IOManager::GetThis() { // dynamic_cast 是 C++ 中的一个运算符,用于在运行时执行安全的类型转换。 // 它主要用于进行基类和派生类之间的安全转型。 return dynamic_cast (Scheduler::GetThis()); } void IOManager::tickle() { std::cout << "[INFO] IOManager::tickle" << std::endl; if(!hasIdleThreads()) { return; } int rt = write(m_tickleFds[1], "T", 1); //SYLAR_ASSERT(rt == 1); } bool IOManager::stopping(uint64_t& timeout) { timeout = getNextTimer(); return timeout == ~0ull && m_pendingEventCount == 0 && Scheduler::stopping(); } bool IOManager::stopping() { uint64_t timeout = 0; return stopping(timeout); } void IOManager::idle() { std::cout << "[DEBUG] IOManager::idle" << std::endl; const uint64_t MAX_EVENTS = 256; // 动态分配一个包含MAX_EVENTS个epoll_event的数组,并初始化为零 epoll_event* events = new epoll_event[MAX_EVENTS](); // 使用std::shared_ptr来管理这个动态分配的数组的内存 std::shared_ptr shared_events(events, [](epoll_event* ptr){ // lambda表达式,用于在shared_ptr释放内存时调用 // 删除动态分配的epoll_event数组 delete[] ptr; }); while(true) { uint64_t next_timeout = 0; // if(SYLAR_UNLIKELY(stopping(next_timeout))) { // SYLAR_LOG_INFO(g_logger) << "name=" << getName() // << " idle stopping exit"; // break; // } if(stopping(next_timeout)) { break; } int rt = 0; do { static const int MAX_TIMEOUT = 3000; if(next_timeout != ~0ull) { next_timeout = (int)next_timeout > MAX_TIMEOUT ? MAX_TIMEOUT : next_timeout; } else { next_timeout = MAX_TIMEOUT; } rt = epoll_wait(m_epfd, events, MAX_EVENTS, (int)next_timeout); if(rt < 0 && errno == EINTR) { // 在中断时不执行任何操作 } else { break; } } while(true); std::vector > cbs; // 填充过期的回调 listExpiredCb(cbs); if(!cbs.empty()) { schedule(cbs.begin(), cbs.end()); cbs.clear(); } for(int i = 0; i < rt; ++i) { epoll_event& event = events[i]; if(event.data.fd == m_tickleFds[0]) { uint8_t dummy[256]; while(read(m_tickleFds[0], dummy, sizeof(dummy)) > 0); continue; // 处理一个特殊情况:如果事件对应于tickle管道的读取端 (m_tickleFds),则从中读取以清空管道。 } FdContext* fd_ctx = (FdContext*)event.data.ptr; FdContext::MutexType::Lock lock(fd_ctx->mutex); if(event.events & (EPOLLERR | EPOLLHUP)) { event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events; } int real_events = NONE; if(event.events & EPOLLIN) { real_events |= READ; } if(event.events & EPOLLOUT) { real_events |= WRITE; } // 以上,确定实际发生的事件 if((fd_ctx->events & real_events) == NONE) { continue; } /* 下面这一行首先计算剩余的事件。fd_ctx->events 表示原始的关注事件集合, 而 real_events 表示实际发生的事件集合。 ~real_events 对 real_events 取反,即将所有位取反,然后使用按位与操作 &, 将原始关注事件集合中对应实际发生事件的位清零。 这样就得到了剩余的未发生事件的集合,存储在 left_events 变量中。 */ int left_events = (fd_ctx->events & ~real_events); int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; event.events = EPOLLET | left_events; /* 接下来,根据剩余的未发生事件集合 left_events 的情况, 确定是要修改(EPOLL_CTL_MOD)还是删除(EPOLL_CTL_DEL)事件。 如果 left_events 不为零,表示仍然有关注的事件,那么操作类型 op 就是 EPOLL_CTL_MOD; 否则,操作类型 op 就是 EPOLL_CTL_DEL。 最后,将 event.events 设置为 EPOLLET | left_events, 将事件设置为边缘触发模式(EPOLLET)以及剩余的未发生事件, 用于后续调用 epoll_ctl 进行修改或删除监听。 */ int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event); if(rt2) { std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd_ctx->fd << ", " << (EPOLL_EVENTS)event.events << "):" << rt2 << " (" << errno << ") (" << strerror(errno) << ")"; continue; } //SYLAR_LOG_INFO(g_logger) << " fd=" << fd_ctx->fd << " events=" << fd_ctx->events // << " real_events=" << real_events; if(real_events & READ) { fd_ctx->triggerEvent(READ); --m_pendingEventCount; } if(real_events & WRITE) { fd_ctx->triggerEvent(WRITE); --m_pendingEventCount; } // 触发相关事件 } Fiber::ptr cur = Fiber::GetThis(); auto raw_ptr = cur.get(); cur.reset(); raw_ptr->swapOut(); //raw_ptr->back(); } std::cout << "[DEBUG] IOManager::idle over" << std::endl; } void IOManager::onTimerInsertedAtFront() { tickle(); } 总的来说这部分内容还是很抽象的,我的个人能力有限,只能表达出这些。关于这部分内容,我也跑了一些测试。测试效果也如预想一样,这次时间也有点些许的匆忙,这次测试结果,我会在下一节定时器中一起呈上来。
香港云服务器租用推荐
服务器租用资讯
·广东云服务有限公司怎么样
·广东云服务器怎么样
·广东锐讯网络有限公司怎么样
·广东佛山的蜗牛怎么那么大
·广东单位电话主机号怎么填写
·管家婆 花生壳怎么用
·官网域名过期要怎么办
·官网邮箱一般怎么命名
·官网网站被篡改怎么办
服务器租用推荐
·美国服务器租用
·台湾服务器租用
·香港云服务器租用
·香港裸金属服务器
·香港高防服务器租用
·香港服务器租用特价