找回密码
 立即注册
首页 业界区 业界 基于 IOCP 的协程调度器——零基础深入浅出 C++20 协程 ...

基于 IOCP 的协程调度器——零基础深入浅出 C++20 协程

簑威龙 2025-9-22 10:40:03
前言

上一篇《基于 epoll 的协程调度器》谈到如何基于 epoll 构建一个事件驱动的协程调度器,没有使用三方库的原因主要是为了避免引入额外复杂度,不过只演示 Linux 未免对非 Unix 平台的小伙伴有所不公,为此本文基于 Windows 的完成端口 (IO Completion Port:IOCP) 构建相同能力的 demo。
文章仍然遵守之前的创作原则:
* 选取合适的 demo 是头等大事
* 以协程为目标,涉及到的新语法会简单说明,不涉及的不旁征博引
* 若语法的原理非常简单,也会简单展开讲讲,有利于透过现象看本质,用起来更得心应手
上一篇文章里不光引入了基于事件的调度器,还说明了如何开启多文件并行、await_suspend 与试读的关系、singalfd 用于完美退出等话题,如果没有这些内容铺垫,看本文时会有很多地方难以理解,还没看过的小伙伴,墙裂建议先看那篇。
工具还是之前介绍过的 Compile Explorer,这里不再用到 C++ Insights ,主要是它不支持 Windows 平台,其实 Compiler Explorer 也只是编译,运行的话还是不太行,因为它的环境不支持像文件、网络之类的异步 IO,需要用户自行搭建开发环境。
基于完成端口的 IO 多路复用

上文中提到了 Unix 系统中多路复用接口的发展历程:分别经历了 select -> poll -> epoll/kqueue,Windows 则通过完成端口一统江山,其实它俩调用方式差不太多:
 epollIOCP
初始化epoll_createCreateIoCompletionPort
关联句柄epoll_ctlCreateIoCompletionPort
等待并获取下一个事件epoll_waitGetQueuedCompletionStatus
投递事件n/a (self pipe trick)PostQueuedCompletionStatus
销毁closeCloseHandle
而在可等待对象上,IOCP 则丰富的多:
* 文件 I/O 事件​​
* 文件系统变更
* 套接字(Socket)事件​​
* 命名管道(Named Pipe)事件​​
* 设备 I/O 事件​​
* 定时器事件(结合 Waitable Timer)​​
这方面能与它相提并论的恐怕只有 kqueue 了。有了上面的铺垫再参考之前 epoll 的实现,直接上 demo 源码:
[code]#include #include #include #include #include #include #include #include struct Task {    struct promise_type {        Task get_return_object() { return {}; }        std::suspend_never initial_suspend() { return {}; }        std::suspend_never final_suspend() noexcept { return {}; }        void return_void() {}        void unhandled_exception() { std::terminate(); }    };};class IocpScheduler {private:    HANDLE iocp_handle;    std::unordered_map io_handles;public:    IocpScheduler() {        iocp_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);        if (iocp_handle == NULL) {            throw std::runtime_error("CreateIoCompletionPort failed");        }    }    ~IocpScheduler() {        CloseHandle(iocp_handle);    }    void register_io(HANDLE file_handle, std::coroutine_handle handle) {        if (io_handles.find(file_handle) == io_handles.end()) {            io_handles[file_handle] = handle;            if (CreateIoCompletionPort(file_handle, iocp_handle, (ULONG_PTR)file_handle, 0) == NULL) {                throw std::runtime_error("CreateIoCompletionPort failed to associate file handle");            }        }    }    void run() {        while (true) {            DWORD bytes_transferred = 0;            ULONG_PTR completion_key = 0;            LPOVERLAPPED overlapped = nullptr;            BOOL success = GetQueuedCompletionStatus(                iocp_handle,                &bytes_transferred,                &completion_key,                &overlapped,                INFINITE);            if (completion_key != 0) {                HANDLE ready_handle = (HANDLE)completion_key;                if (auto it = io_handles.find(ready_handle); it != io_handles.end()) {                    it->second.resume();                }            }        }    }};struct AsyncReadAwaiter {    IocpScheduler& sched;    HANDLE file_handle;    std::unique_ptr buffer;    DWORD buffer_size;    OVERLAPPED overlapped;    DWORD bytes_read;    AsyncReadAwaiter(IocpScheduler& s, HANDLE file, DWORD size)        : sched(s), file_handle(file), buffer_size(size), bytes_read(0) {        buffer = std::make_unique(size);        ZeroMemory(&overlapped, sizeof(OVERLAPPED));    }    bool await_ready() const {        return false;    }    void await_suspend(std::coroutine_handle h) {        sched.register_io(file_handle, h);                if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {            DWORD error = GetLastError();            if (error != ERROR_IO_PENDING) {                std::stringstream ss;                ss

相关推荐

您需要登录后才可以回帖 登录 | 立即注册