前言
上一篇《基于 epoll 的协程调度器》谈到如何基于 epoll 构建一个事件驱动的协程调度器,没有使用三方库的原因主要是为了避免引入额外复杂度,不过只演示 Linux 未免对非 Unix 平台的小伙伴有所不公,为此本文基于 Windows 的完成端口 (IO Completion Port:IOCP) 构建相同能力的 demo。
文章仍然遵守之前的创作原则:
* 选取合适的 demo 是头等大事
* 以协程为目标,涉及到的新语法会简单说明,不涉及的不旁征博引
* 若语法的原理非常简单,也会简单展开讲讲,有利于透过现象看本质,用起来更得心应手
上一篇文章里不光引入了基于事件的调度器,还说明了如何开启多文件并行、await_suspend 与试读的关系、singalfd 用于完美退出等话题,如果没有这些内容铺垫,看本文时会有很多地方难以理解,还没看过的小伙伴,墙裂建议先看那篇。
工具还是之前介绍过的 Compile Explorer,这里不再用到 C++ Insights ,主要是它不支持 Windows 平台,其实 Compiler Explorer 也只是编译,运行的话还是不太行,因为它的环境不支持像文件、网络之类的异步 IO,需要用户自行搭建开发环境。
基于完成端口的 IO 多路复用
上文中提到了 Unix 系统中多路复用接口的发展历程:分别经历了 select -> poll -> epoll/kqueue,Windows 则通过完成端口一统江山,其实它俩调用方式差不太多:
| epoll | IOCP | 初始化 | epoll_create | CreateIoCompletionPort | 关联句柄 | epoll_ctl | CreateIoCompletionPort | 等待并获取下一个事件 | epoll_wait | GetQueuedCompletionStatus | 投递事件 | n/a (self pipe trick) | PostQueuedCompletionStatus | 销毁 | close | CloseHandle | 而在可等待对象上,IOCP 则丰富的多:
* 文件 I/O 事件
* 文件系统变更
* 套接字(Socket)事件
* 命名管道(Named Pipe)事件
* 设备 I/O 事件
* 定时器事件(结合 Waitable Timer)
这方面能与它相提并论的恐怕只有 kqueue 了。有了上面的铺垫再参考之前 epoll 的实现,直接上 demo 源码:
[code]#include #include #include #include #include #include #include #include struct Task { struct promise_type { Task get_return_object() { return {}; } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() { std::terminate(); } };};class IocpScheduler {private: HANDLE iocp_handle; std::unordered_map io_handles;public: IocpScheduler() { iocp_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (iocp_handle == NULL) { throw std::runtime_error("CreateIoCompletionPort failed"); } } ~IocpScheduler() { CloseHandle(iocp_handle); } void register_io(HANDLE file_handle, std::coroutine_handle handle) { if (io_handles.find(file_handle) == io_handles.end()) { io_handles[file_handle] = handle; if (CreateIoCompletionPort(file_handle, iocp_handle, (ULONG_PTR)file_handle, 0) == NULL) { throw std::runtime_error("CreateIoCompletionPort failed to associate file handle"); } } } void run() { while (true) { DWORD bytes_transferred = 0; ULONG_PTR completion_key = 0; LPOVERLAPPED overlapped = nullptr; BOOL success = GetQueuedCompletionStatus( iocp_handle, &bytes_transferred, &completion_key, &overlapped, INFINITE); if (completion_key != 0) { HANDLE ready_handle = (HANDLE)completion_key; if (auto it = io_handles.find(ready_handle); it != io_handles.end()) { it->second.resume(); } } } }};struct AsyncReadAwaiter { IocpScheduler& sched; HANDLE file_handle; std::unique_ptr buffer; DWORD buffer_size; OVERLAPPED overlapped; DWORD bytes_read; AsyncReadAwaiter(IocpScheduler& s, HANDLE file, DWORD size) : sched(s), file_handle(file), buffer_size(size), bytes_read(0) { buffer = std::make_unique(size); ZeroMemory(&overlapped, sizeof(OVERLAPPED)); } bool await_ready() const { return false; } void await_suspend(std::coroutine_handle h) { sched.register_io(file_handle, h); if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) { DWORD error = GetLastError(); if (error != ERROR_IO_PENDING) { std::stringstream ss; ss |