本文 的 原文 地址
原始的内容,请参考 本文 的 原文 地址
本文 的 原文 地址
本文作者:
- 第一作者 老架构师 肖恩(肖恩 是尼恩团队 高级架构师,负责写此文的第一稿,初稿 )
- 第二作者 老架构师 尼恩 (45岁老架构师, 负责 提升此文的 技术高度,让大家有一种 俯视 技术、俯瞰技术、 技术自由 的感觉)
Redis 命令执行过程分析
一条Redis命令的执行流程,本质是"连接建立→处理阶段→返回结果"3个环节 串联,每个环节都由事件机制驱动。
处理阶段 包含2个 子阶段:读取命令→ 解析执行
理解这个过程,能帮助 大家 深入地掌握Redis的性能优化(比如避免内存溢出)、故障排查(比如 慢查询排查、耗时命令定位排查)等实践技能。
Redis命令执行的三大阶段
一条命令从执行到返回数据,主要涉及三个阶段,具体如下:
- 第一阶段:建立连接阶段:
完成 socket 连接建立,并创建 client 对象;
- 第二阶段:处理阶段:
从 socket 读取数据到输入缓冲区,解析获得命令后执行,再将返回值存入输出缓冲区;
- 第三阶段:数据返回阶段:
将输出缓冲区的返回值写入 socket 并返回给客户端,最后关闭 client。
这三个阶段通过事件机制串联。
Redis 启动时会先注册 socket 连接建立事件处理器:
- 客户端请求建立 socket 连接时,触发对应处理器,完成连接建立后注册 socket 读取事件处理器;
- 客户端发送命令时,读取事件处理器被触发,执行处理阶段逻辑后注册 socket 写事件处理器;
- 写事件处理器被触发时,将返回值写回 socket。
Redis命令执行的三个阶段,通过 反应器模式 事件机制 驱动和串联,
接下来的内容 非常重要, 就是: 反应器模式 事件机制
45岁老架构师尼恩提示: 这个是面试的核心重点。
反应器模式(Reactor Pattern)事件机制
反应器模式是一种事件驱动(Event-Driven)的设计模式。
核心目标是高效处理多并发 I/O 操作。
其通过一个 "反应器"(Reactor)组件统一管理事件的注册、监听和分发,将 I/O 事件(如连接建立、数据可读 / 可写)分发给对应的处理器(Handler)处理,避免传统多线程模型中线程上下文切换的开销,提升系统吞吐量。
Reactor Pattern 核心组件
- 事件(Event):触发处理的信号,如 "连接建立"、"数据可读"、"定时任务触发" 等。
- 反应器(Reactor):事件的 "总调度中心",负责监听事件(如 I/O 事件、定时事件),并将事件分发给对应的处理器。
- 事件多路分发器(Event Multiplexer):底层依赖操作系统的 I/O 多路复用机制(如 select、poll、epoll、kqueue),同时监听多个 I/O 句柄(如 socket),当事件触发时通知反应器。
- 事件处理器(Handler):负责具体的事件处理逻辑(如读取数据、解析请求、发送响应),与反应器解耦。
Redis 单线程 反应器实现
Redis 通过单线程高效处理大量客户端的网络请求和定时任务。
Redis 的反应器模式以事件循环(Event Loop) 为核心,单线程循环处理两类事件:文件事件(网络 I/O 事件) 和时间事件(定时任务)。
(1)Redis 事件循环流程(非常简单)
伪代码如下:
while (1) {
- 时间事件距离计算: 计算当前距离下一个时间事件的阻塞时间(避免无意义等待);
- IO事件阻塞等待: 调用I/O多路复用器,阻塞等待IO事件(超时时间为步骤1的结果);
- 处理IO事件:处理所有已就绪的IO事件,按事件类型分发给对应 handler
- 处理时间事件:处理所有已到期的时间事件;
}
更加细致的Redis 事件循环流程 循环:
调用 epoll_wait 等待事件 → 返回就绪事件 → 按事件类型分发给对应 handler。
Redis 三大事件类型,以及对应的处理器如下:
- ACCEPT(新链接事件) → acceptTcpHandler → 建立连接并注册 READ 事件;
- READ (io读就绪事件)→ readQueryFromClient → 读取-解析-执行命令 → 准备响应 → 注册 WRITE 事件;
- WRITE (io写就绪事件)→ sendReplyToClient → 发送完取消写事件。
(2)关键组件实现
- 事件多路分发器:
Redis 会根据操作系统自动选择最优的 I/O 多路复用器(如 Linux 用 epoll,macOS 用 kqueue),封装为统一的aeApi接口,负责监听 socket 的读 / 写事件。
- IO事件处理器:按照IO事件类型,调用对应的处理器,完成事件处理。
- 时间事件处理器:处理定时任务(如过期键清理、AOF 日志刷盘),通过链表存储时间事件,每次事件循环检查并执行已到期的任务。
Redis 的IO事件 (/文件事件)按事件类型分为三类处理器 (handler):
- 连接应答处理器:acceptTcpHandler , 处理客户端的新连接请求(对应 socket 的 "可读" 事件,触发accept);
- 命令请求处理器:readQueryFromClient , 处理客户端发送的命令(读取数据、解析命令);
- 命令回复处理器:sendReplyToClient , 将命令执行结果返回给客户端(处理 socket 的 "可写" 事件)。
(3)特点
- 单线程模型:网络I/O + 命令执行 + 响应返回, 整个事件循环由一个线程执行,避免锁竞争,内存操作原子性天然保障,简化设计;
- 事件优先级:文件事件优先于时间事件(时间事件仅在文件事件处理完毕后执行);
- 高效性:通过 I/O 多路复用器批量处理就绪事件,单线程即可支撑数万并发连接。
关于单线程版本的 反应器实现 ,请参见尼恩的《Java高并发核心编程卷1》 ,此书对单线程版本的 反应器有通俗易懂的系统化介绍。
Netty 多线程 反应器实现
Netty 是基于 Java NIO 的高性能网络框架,其反应器模式采用多线程架构,通过 "主从 Reactor" 模型实现连接处理与 I/O 读写的分离,支撑更高的并发场景。
Netty 的反应器模式以 EventLoopGroup(事件循环组) 为核心,通过分工明确的多线程处理网络事件。
(1)主从 Reactor 模型
- Boss Group(主 Reactor): 负责监听客户端的连接请求(处理OP_ACCEPT事件),建立连接后将 socketChannel 注册到 Worker Group。
- Worker Group(从 Reactor): 负责处理已建立连接的 I/O 事件(OP_READ/OP_WRITE),并通过 ChannelPipeline 中的处理器(Handler)处理业务逻辑。
(2)关键组件实现
- EventLoopGroup(事件循环组) : 包含一个或者多个 EventLoop
- EventLoop:每个 EventLoop 对应一个线程,循环执行事件处理(类似 Redis 的事件循环),包含一个 Selector(Java NIO 的 I/O 多路复用器),负责监听注册到其上的 Channel 的 I/O 事件。
- ChannelPipeline:事件处理的 "责任链",包含多个 ChannelHandler,负责解析数据(如编解码)、处理业务逻辑(如请求响应),支持动态添加 / 移除处理器,灵活性极高。
- 事件分发:当 Selector 检测到 I/O 事件后,EventLoop 会将事件封装为 ChannelEvent,按顺序在 ChannelPipeline 中传递,由对应的 Handler 处理。
(3)Netty事件循环流程(比redis 复杂得多)
Boss 循环监听 OP_ACCEPT事件 → accept → 注册 Channel 到 Worker → Worker 循环监听 OP_READ/OP_WRITE → Pipeline 顺序触发 Handler → 读写完成。
这里比redis 复杂得多, 具体请参见尼恩 讲的 Netty源码视频。
(3)特点
- 多线程模型:通过 Boss/Worker 分组,分离连接建立与 I/O 处理,充分利用多核 CPU;
- 异步非阻塞:所有 I/O 操作均为异步,通过 Future/Promise 机制处理结果,避免线程阻塞;
- 高度可扩展:ChannelPipeline 支持灵活的处理器组合,适配不同协议(如 HTTP、TCP、WebSocket)。
关于多线程版本的 反应器实现 ,请参见尼恩的《Java高并发核心编程卷1》 ,此书对多线程版本的 反应器有通俗易懂的系统化介绍。
区别:Redis 是一个小号的 Netty
反应器模式的核心是 "事件驱动 + I/O 多路复用",但 Redis 和 Netty 因场景不同选择了差异化实现:
- Redis 以单线程简化设计,聚焦高效处理内存型请求;
- Netty 以多线程主从模型提升并发能力,支撑复杂网络通信。
两者均通过反应器模式最大化了 I/O 处理效率,是各自领域高性能的关键设计。
维度Redis(单线程 Reactor)Netty(主从多线程 Reactor)线程数1 主线程完成 accept+read+write+cmdBoss(1)+Worker(N)+可选业务线程池Reactor 数量单 Reactor主 Reactor + 多 Sub-Reactor并发瓶颈单核 CPU,长命令阻塞Worker 可水平扩展,长任务可下沉业务线程池编程模型纯 C 函数回调,无 Handler 链Pipeline + ChannelHandler 责任链锁机制无锁Worker 之间无锁,Handler 共享数据需同步适用场景极高 QPS 且命令极快(缓存)通用网络框架,长/短连接、大文件、RPC 等Redis 用极简单线程 Reactor换取极致低延迟;Netty 用主从多线程 Reactor换取高并发与可扩展性。
redis,是一个小号的 netty。
接下来,我们逐一分析Redis 反应器的 各个步骤的具体原理和代码实现。
Redis启动时监听socket
Redis 服务器启动时会调用 initServer 方法,主要完成三件事:
- 建立事件机制 eventLoop、
- 注册周期时间事件处理器(如serverCron),用于定期执行后台操作:清理过期键、统计信息更新等
- 注册监听 socket 的IO事件处理器(监听连接建立事件,处理函数为 acceptTcpHandler)。
- void initServer(void) { // server.c
- ....
- /**
- * 创建Redis的事件循环器eventLoop,用于管理所有事件(连接、读写等)
- */
- server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
- /* 打开用于接收用户命令的TCP监听socket */
- if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
- exit(1);
- /**
- * 注册周期时间事件处理器(如serverCron)
- * 用于定期执行后台操作:清理过期键、统计信息更新等
- */
- if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
- serverPanic("Can't create event loop timers.");
- exit(1);
- }
- /**
- * 为所有监听的socket注册文件事件处理器
- * 监听"可读事件"(客户端发起连接请求时触发)
- * 绑定处理函数acceptTcpHandler,负责建立连接
- */
- for (j = 0; j < server.ipfd_count; j++) {
- if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
- acceptTcpHandler,NULL) == AE_ERR)
- {
- serverPanic(
- "Unrecoverable error creating server.ipfd file event.");
- }
- }
- ....
- }
复制代码 Redis 的命令执行全程依赖 initServer 中创建的 aeEventLoop 事件循环器。
当 socket 发生对应事件(如连接请求、数据到达)时,aeEventLoop 会自动进行事件分发,调用预先注册的处理器。
第一阶段:建立连接阶段
当客户端向 Redis 建立 socket时,aeEventLoop 会把事件分发到 acceptTcpHandler函数 处理。
acceptTcpHandler 为每个链接创建一个 Client 对象,并创建相应IO事件来监听socket的可读事件,并指定事件处理函数。
acceptTcpHandler 函数会首先调用 anetTcpAccept方法,它底层会调用 socket 的 accept 方法,也就是接受客户端来的建立连接请求,然后调用 acceptCommonHandler方法, 创建 client 对象,并注册 socket 读事件处理器。
连接处理核心代码
- // 客户端建立连接时的事件处理函数(networking.c)
- void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- ....
- // 接受客户端连接(底层调用socket的accept)
- // anetTcpAccept最终在anet.c的anetGenericAccept中调用accept
- cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
- if (cfd == ANET_ERR) {
- if (errno != EWOULDBLOCK)
- serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr);
- return;
- }
- serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
- /**
- * 处理连接建立后的逻辑:创建client、检查连接数等
- */
- acceptCommonHandler(cfd,0,cip);
- }
- // 连接建立后的通用处理(networking.c)
- static void acceptCommonHandler(int fd, int flags, char *ip) {
- client *c;
- // 创建client对象(代表一个客户端连接)
- c = createClient(fd);
- // 检查是否超过最大客户端连接数(配置文件中的maxclients)
- if (listLength(server.clients) > server.maxclients) {
- char *err = "-ERR max number of clients reached\r\n";
- if (write(c->fd,err,strlen(err)) == -1) {
- // 写入失败不处理,后续会关闭连接
- }
- server.stat_rejected_conn++;
- freeClient(c); // 释放client并关闭连接
- return;
- }
- .... // 处理无密码时的默认保护状态
- // 更新连接数统计
- server.stat_numconnections++;
- c->flags |= flags;
- }
复制代码 client对象创建
createClient 方法会初始化 client 的属性(如输入/输出缓冲区),配置 socket 为非阻塞模式,设置 NODELAY 和 SOKEEPALIVE标志位来关闭 Nagle 算法并且启动 socket 存活检查机制,并注册读事件处理器(当客户端发送数据时触发)。- client *createClient(int fd) {
- client *c = zmalloc(sizeof(client));
- // fd为-1时用于特殊场景(如执行lua脚本)
- if (fd != -1) {
- // 配置socket:非阻塞模式(避免IO阻塞)、关闭Nagle算法(减少延迟)、开启保活机制
- anetNonBlock(NULL,fd);
- anetEnableTcpNoDelay(NULL,fd);
- if (server.tcpkeepalive)
- anetKeepAlive(NULL,fd,server.tcpkeepalive);
- /**
- * 向eventLoop注册读事件处理器
- * 当客户端通过socket发送数据时,调用readQueryFromClient读取数据
- */
- if (aeCreateFileEvent(server.el,fd,AE_READABLE,
- readQueryFromClient, c) == AE_ERR)
- {
- close(fd);
- zfree(c);
- return NULL;
- }
- }
- // 初始化客户端默认选中的数据库(默认第0个)
- selectDb(c,0);
- uint64_t client_id;
- atomicGetIncr(server.next_client_id,client_id,1);
- c->id = client_id;
- c->fd = fd;
- .... // 初始化其他属性(输入缓冲区querybuf、输出缓冲区等)
- return c;
- }
复制代码 client 对象包含输入缓冲区(存储客户端发送的数据)和输出缓冲区(存储Redis的响应数据),按类型分为
- 普通客户端、
- 从客户端(主从复制用)、
- 订阅客户端(发布订阅用),
不同类型的缓冲区大小配置不同。
第二阶段:命令处理阶段
读取socket数据到输入缓冲区
客户端发送命令后,socket 触发读事件TriggerRead ,这个事件的CallRead readQueryFromClient 会将数据读入 client 的输入缓冲区 querybuf,并检查缓冲区大小是否超过限制。
- 若为普通客户端,直接处理缓冲区数据;
- 若为主从复制中的主客户端,还需同步命令到从节点。
- // 读取客户端发送的数据到输入缓冲区(networking.c)
- void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
- client *c = (client*) privdata;
- ....
- // 为输入缓冲区预留空间
- c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
- // 从socket读取数据到querybuf(输入缓冲区)
- nread = read(fd, c->querybuf+qblen, readlen);
- if (nread == -1) {
- .... // 读取错误,释放client
- } else if (nread == 0) {
- // 客户端主动关闭连接
- serverLog(LL_VERBOSE, "Client closed connection");
- freeClient(c);
- return;
- } else if (c->flags & CLIENT_MASTER) {
- /* 若client是主从复制中的主节点,将数据存入pending_querybuf用于同步到从节点 */
- c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread);
- }
- // 更新输入缓冲区的长度
- sdsIncrLen(c->querybuf,nread);
- c->lastinteraction = server.unixtime;
- if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
- server.stat_net_input_bytes += nread;
- // 检查输入缓冲区是否超过配置的最大限制(client-query-buffer-limit)
- if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
- sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
- bytes = sdscatrepr(bytes,c->querybuf,64);
- serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
- sdsfree(ci);
- sdsfree(bytes);
- freeClient(c); // 关闭客户端连接
- return;
- }
- if (!(c->flags & CLIENT_MASTER)) {
- // 处理普通客户端的输入缓冲区
- processInputBuffer(c);
- } else {
- // 处理主从复制中的主节点客户端
- size_t prev_offset = c->reploff;
- processInputBuffer(c);
- // 若同步偏移量变化,通知从节点更新
- size_t applied = c->reploff - prev_offset;
- if (applied) {
- replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied);
- sdsrange(c->pending_querybuf,applied,-1);
- }
- }
- }
复制代码 解析获取命令
processInputBuffer 会解析输入缓冲区的数据,根据命令格式调用不同方法解析,最终得到命令参数 argv 和参数个数 argc,再调用 processCommand 执行命令。
执行成功后,如果是主从客户端,还需要更新同步偏移量 reploff 属性
命令格式 主要有:
- 单行命令 PROTO_REQ_INLINE
- 批量命令 PROTO_REQ_MULTIBULK
- void processInputBuffer(client *c) { // networking.c
- server.current_client = c;
- /* 循环处理输入缓冲区中的所有数据 */
- while(sdslen(c->querybuf)) {
- .... // 处理client的状态(如是否阻塞、是否在事务中)
- /* 判断命令格式类型(telnet和redis-cli发送的命令格式不同) */
- if (!c->reqtype) {
- if (c->querybuf[0] == '*') {
- c->reqtype = PROTO_REQ_MULTIBULK; // 批量命令格式(如*2\r\n$3\r\nSET\r\n$5\r\nhello\r\n)
- } else {
- c->reqtype = PROTO_REQ_INLINE; // 单行命令格式(如SET hello world)
- }
- }
- /**
- * 解析输入缓冲区数据,得到命令参数argv和参数个数argc
- */
- if (c->reqtype == PROTO_REQ_INLINE) {
- if (processInlineBuffer(c) != C_OK) break; // 解析失败则退出循环
- } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
- if (processMultibulkBuffer(c) != C_OK) break;
- } else {
- serverPanic("Unknown request type");
- }
- /* 参数个数为0时,重置client以接收下一条命令 */
- if (c->argc == 0) {
- resetClient(c);
- } else {
- // 执行解析得到的命令
- if (processCommand(c) == C_OK) {
- if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
- // 若为从节点的主客户端,更新同步偏移量
- c->reploff = c->read_reploff - sdslen(c->querybuf);
- }
- // 非阻塞状态下,重置client以接收新命令
- if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
- resetClient(c);
- }
- }
- }
- server.current_client = NULL;
- }
复制代码 执行命令 processCommand方法
processCommand 是命令执行的核心逻辑,主要分为三步:
- 首先是调用 lookupCommand 方法获得对应的 redisCommand;
- 接着是检测当前 Redis 是否可以执行该命令;
- 最后是调用 call 方法真正执行命令。
(1) 如果命令名称为 quit,则直接返回,并且设置客户端标志位。
(2) 根据 argv[0] 查找对应的 redisCommand,所有的命令都存储在命令字典 redisCommandTable 中,根据命令名称可以获取对应的命令。
(3) 进行用户权限校验。
(4) 如果是集群模式,处理集群重定向。当命令发送者是 master 或者 命令没有任何 key 的参数时可以不重定向。
(5) 预防 maxmemory 情况,先尝试回收一下,如果不行,则返回异常。
(6) 当此服务器是 master 时:aof 持久化失败时,或上一次 bgsave 执行错误,且配置 bgsave 参数和 stopwritesonbgsaveerr;禁止执行写命令。
(7) 当此服务器时master时:如果配置了 replminslavestowrite,当slave数目小于时,禁止执行写命令。
(8) 当时只读slave时,除了 master 的不接受其他写命令。
(9) 当客户端正在订阅频道时,只会执行部分命令。
(10) 服务器为slave,但是没有连接 master 时,只会执行带有 CMD_STALE 标志的命令,如 info 等
(11) 正在加载数据库时,只会执行带有 CMD_LOADING 标志的命令,其余都会被拒绝。
(12) 当服务器因为执行lua脚本阻塞时,只会执行部分命令,其余都会拒绝
(13) 如果是事务命令,则开启事务,命令进入等待队列;否则调用call直接执行命令。- int processCommand(client *c) {
- // 1. 处理QUIT命令(返回OK并标记关闭)
- if (!strcasecmp(c->argv[0]->ptr,"quit")) {
- addReply(c,shared.ok);
- c->flags |= CLIENT_CLOSE_AFTER_REPLY;
- return C_ERR;
- }
- /**
- * 2. 根据命令名称查找对应的redisCommand结构体
- * 所有命令存储在redisCommandTable字典中(如{"get":getCommand, "set":setCommand})
- */
- c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
- if (!c->cmd) {
- // 处理未知命令(返回错误)
- addReplyErrorFormat(c,"unknown command `%s`", (char*)c->argv[0]->ptr);
- return C_OK;
- } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) {
- // 检查参数个数是否匹配(arity为命令定义的参数要求)
- addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name);
- return C_OK;
- }
- // 3. 检查用户认证(若配置requirepass且未认证,仅允许AUTH命令)
- if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
- {
- flagTransaction(c);
- addReply(c,shared.noautherr);
- return C_OK;
- }
- /**
- * 4. 集群模式下处理命令重定向
- * 若命令需在其他节点执行,返回重定向信息(如-MOVED)
- */
- if (server.cluster_enabled &&
- !(c->flags & CLIENT_MASTER) &&
- !(c->flags & CLIENT_LUA &&
- server.lua_caller->flags & CLIENT_MASTER) &&
- !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
- c->cmd->proc != execCommand))
- {
- int hashslot;
- int error_code;
- clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
- &hashslot,&error_code);
- if (n == NULL || n != server.cluster->myself) {
- if (c->cmd->proc == execCommand) {
- discardTransaction(c);
- } else {
- flagTransaction(c);
- }
- clusterRedirectClient(c,n,hashslot,error_code);
- return C_OK;
- }
- }
- // 5. 处理maxmemory限制(尝试回收内存,失败则返回错误)
- if (server.maxmemory) {
- int retval = freeMemoryIfNeeded();
- if (retval == C_ERR) {
- addReply(c, shared.oomerr);
- return C_OK;
- }
- }
- /**
- * 6. 主节点特殊检查:
- * - AOF持久化失败或bgsave错误时,禁止写命令
- * - 从节点数量不足时(repl-min-slaves-to-write),禁止写命令
- */
- if (((server.stop_writes_on_bgsave_err &&
- server.saveparamslen > 0 &&
- server.lastbgsave_status == C_ERR) ||
- server.aof_last_write_status == C_ERR) &&
- server.masterhost == NULL &&
- (c->cmd->flags & CMD_WRITE ||
- c->cmd->proc == pingCommand)) {
- addReplySds(c, sdscatprintf(sdsempty(),
- "-MISCONF Errors writing to the AOF file: %s\r\n",
- server.aof_last_write_errstr ? server.aof_last_write_errstr : "unknown error"));
- return C_OK;
- }
- /**
- * 7. 从节点特殊检查:
- * - 只读从节点拒绝非主节点的写命令
- * - 未连接主节点时,仅允许带CMD_STALE标志的命令(如INFO)
- */
- if (server.masterhost && server.repl_slave_ro &&
- !(c->flags & CLIENT_MASTER) &&
- c->cmd->flags & CMD_WRITE) {
- addReply(c,shared.readonlyerr);
- return C_OK;
- }
- /**
- * 8. 其他检查:
- * - 订阅状态的客户端仅允许特定命令(PING、SUBSCRIBE等)
- * - 加载数据库时仅允许带CMD_LOADING标志的命令
- * - Lua脚本阻塞时仅允许特定命令(AUTH、SHUTDOWN等)
- */
- if (c->flags & CLIENT_PUBSUB &&
- c->cmd->proc != pingCommand &&
- c->cmd->proc != subscribeCommand &&
- c->cmd->proc != unsubscribeCommand &&
- c->cmd->proc != psubscribeCommand &&
- c->cmd->proc != punsubscribeCommand) {
- addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
- return C_OK;
- }
- /**
- * 9. 执行命令:
- * - 事务中(CLIENT_MULTI)的命令入队等待EXEC
- * - 非事务命令直接调用call执行
- */
- if (c->flags & CLIENT_MULTI &&
- c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
- c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
- {
- // 事务命令入队
- queueMultiCommand(c);
- addReply(c,shared.queued);
- } else {
- // 非事务命令直接执行
- call(c,CMD_CALL_FULL);
- c->woff = server.master_repl_offset;
- if (listLength(server.ready_keys))
- handleClientsBlockedOnLists();
- }
- return C_OK;
- }
- // 所有Redis命令的定义(部分示例)
- struct redisCommand redisCommandTable[] = {
- {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
- {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
- {"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
- .... // 其他命令
- };
复制代码 call方法
call 方法负责实际执行命令,包括通知监视器、调用命令处理函数、记录慢查询日志、命令传播(同步到AOF和从节点)等。
- 如果有监视器 monitor,则需要将命令发送给监视器。
- 调用 redisCommand 的proc 方法,执行对应具体的命令逻辑。
- 如果开启了 CMDCALLSLOWLOG,则需要记录慢查询日志
- 如果开启了 CMDCALLSTATS,则需要记录一些统计信息
- 如果开启了 CMDCALLPROPAGATE,则当 dirty大于0时,需要调用 propagate 方法来进行命令传播。
命令传播就是将命令写入 repl-backlog-buffer 缓冲中,并发送给各个从服务器中。
call方法关键源码:- // 执行命令的具体实现
- void call(client *c, int flags) {
- /**
- * dirty:记录数据库修改次数
- * start/duration:记录命令执行的开始时间和耗时(微秒)
- */
- long long dirty, start, duration;
- int client_old_flags = c->flags;
- /**
- * 若有监视器(monitor),将命令发送给监视器
- * 排除从AOF加载的命令和特定管理员命令
- */
- if (listLength(server.monitors) &&
- !server.loading &&
- !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
- {
- replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
- }
- ....
- /* 执行命令 */
- dirty = server.dirty;
- start = ustime();
- // 调用命令的处理函数(如setCommand、getCommand)
- c->cmd->proc(c);
- duration = ustime()-start;
- dirty = server.dirty-dirty;
- if (dirty < 0) dirty = 0;
- .... // Lua脚本的特殊处理
- /**
- * 记录慢查询日志(若命令耗时超过slowlog-log-slower-than)
- */
- if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
- char *latency_event = (c->cmd->flags & CMD_FAST) ?
- "fast-command" : "command";
- latencyAddSampleIfNeeded(latency_event,duration/1000);
- slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
- }
- /**
- * 更新命令统计信息
- */
- if (flags & CMD_CALL_STATS) {
- c->lastcmd->microseconds += duration;
- c->lastcmd->calls++;
- }
- /**
- * 命令传播:若修改了数据库(dirty>0),同步到AOF和从节点
- */
- if (flags & CMD_CALL_PROPAGATE &&
- (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
- {
- int propagate_flags = PROPAGATE_NONE;
- /**
- * dirty大于0时,需要广播命令给slave和aof
- */
- if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
- ....
- /**
- * 广播命令,写如aof,发送命令到slave
- * 也就是传说中的传播命令
- */
- if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
- propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
- }
- ....
- }
复制代码 命令传播会将命令写入 repl-backlog-buffer 缓冲区,并同步到所有从节点,保证主从数据一致性;同时写入AOF文件(若开启),实现持久化。
第三阶段:数据返回阶段
命令执行完成后,结果会被存入Client的输出缓冲区(buf或reply链表)。
当输出缓冲区有数据时,Redis会注册"写事件处理器",将结果通过socket写回客户端。
命令结果写入输出缓冲区
命令执行完成后都会通过 addReply 方法将结果写入输出缓冲区,等待返回给客户端。addReply 的核心逻辑是:准备写入环境,然后将结果写入输出缓冲区(固定缓冲区或链表)。
- void addReply(client *c, robj *obj) {
- // 准备客户端写入环境(判断是否需要返回结果、加入等待队列等)
- if (prepareClientToWrite(c) != C_OK) return;
- if (sdsEncodedObject(obj)) {
- // 若对象是SDS编码,先尝试写入固定缓冲区
- if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
- // 缓冲区满了,写入响应链表
- _addReplyObjectToList(c,obj);
- } else if (obj->encoding == OBJ_ENCODING_INT) {
- // 整数编码的特殊优化(直接转换为字符串写入)
- char buf[32];
- size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
- if (_addReplyToBuffer(c,buf,len) != C_OK)
- _addReplyObjectToList(c,obj);
- } else {
- serverPanic("Wrong obj->encoding in addReply()");
- }
- }
复制代码 prepareClientToWrite 方法主要负责判断客户端是否需要返回结果,并将客户端加入等待写入队列(clients_pending_write):- int prepareClientToWrite(client *c) {
- // Lua脚本或模块的客户端,直接允许写入
- if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
- // 客户端要求不返回结果(如REPLY OFF),直接拒绝
- if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
- // 主从复制中的主节点客户端,不需要返回结果
- if ((c->flags & CLIENT_MASTER) &&
- !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
- // AOF加载时的临时客户端,不需要返回结果
- if (c->fd <= 0) return C_ERR;
- // 若客户端不在等待写入队列,加入队列
- if (!clientHasPendingReplies(c) &&
- !(c->flags & CLIENT_PENDING_WRITE) &&
- (c->replstate == REPL_STATE_NONE ||
- (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
- {
- c->flags |= CLIENT_PENDING_WRITE;
- listAddNodeHead(server.clients_pending_write,c);
- }
- return C_OK;
- }
复制代码 Redis 的输出缓冲区由两部分组成:
- 固定缓冲区(buf):小数据直接写入,速度快;
- 响应链表(reply):大数据或缓冲区满时,以节点形式存入链表。
这种设计的好处是:平衡性能和内存效率,小数据用缓冲区减少分配开销,大数据用链表避免缓冲区溢出。
命令返回值从输出缓冲区写入 socket
输出缓冲区中的数据不会立即写入 socket,而是等待 Redis 事件循环处理。具体来说,事件循环每次执行前会调用 beforeSleep 方法,该方法会通过 handleClientsWithPendingWrites 处理等待写入队列中的客户端。
下面的 aeMain 方法就是 Redis 事件循环的主逻辑,可以看到每次循环时都会调用 beforesleep 方法。- void aeMain(aeEventLoop *eventLoop) { // ae.c
- eventLoop->stop = 0;
- while (!eventLoop->stop) {
- /* 如果有需要在事件处理前执行的函数,那么执行它 */
- if (eventLoop->beforesleep != NULL)
- eventLoop->beforesleep(eventLoop);
- /* 开始处理事件*/
- aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
- }
- }
复制代码 beforeSleep 函数会调用 handleClientsWithPendingWrites 函数来处理 clientspendingwrite 列表。- // 处理等待写入队列中的客户端
- int handleClientsWithPendingWrites(void) {
- listIter li;
- listNode *ln;
- int processed = listLength(server.clients_pending_write); // 统计待处理客户端数
- listRewind(server.clients_pending_write,&li);
-
- while((ln = listNext(&li))) {
- client *c = listNodeValue(ln);
- c->flags &= ~CLIENT_PENDING_WRITE; // 移除等待标记
- listDelNode(server.clients_pending_write,ln); // 从队列中移除
-
- // 尝试将输出缓冲区数据写入socket
- if (writeToClient(c->fd,c,0) == C_ERR) continue;
-
- // 若数据未写完,注册写事件处理器等待下次写入
- if (clientHasPendingReplies(c)) {
- int ae_flags = AE_WRITABLE;
- // AOF实时同步时需要设置屏障确保顺序
- if (server.aof_state == AOF_ON &&
- server.aof_fsync == AOF_FSYNC_ALWAYS)
- {
- ae_flags |= AE_BARRIER;
- }
- // 注册写事件处理器sendReplyToClient
- if (aeCreateFileEvent(server.el, c->fd, ae_flags,
- sendReplyToClient, c) == AE_ERR)
- {
- freeClientAsync(c);
- }
- }
- }
- return processed;
- }
- // 实际写入socket的方法
- int writeToClient(int fd, client *c, int handler_installed) {
- ssize_t nwritten = 0, totwritten = 0;
- size_t objlen;
- sds o;
- // 循环写入所有待返回数据
- while(clientHasPendingReplies(c)) {
- if (c->bufpos > 0) {
- // 先写固定缓冲区中的数据
- nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
- if (nwritten <= 0) break; // 写入失败或需重试
- c->sentlen += nwritten;
- totwritten += nwritten;
- // 缓冲区数据写完,重置缓冲区
- if ((int)c->sentlen == c->bufpos) {
- c->bufpos = 0;
- c->sentlen = 0;
- }
- } else {
- // 缓冲区为空,从响应链表取数据
- o = listNodeValue(listFirst(c->reply));
- objlen = sdslen(o);
- if (objlen == 0) {
- listDelNode(c->reply,listFirst(c->reply));
- continue;
- }
- // 写链表中的数据
- nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
- if (nwritten <= 0) break;
- c->sentlen += nwritten;
- totwritten += nwritten;
- // 链表节点数据写完,移除节点
- if (c->sentlen == objlen) {
- listDelNode(c->reply,listFirst(c->reply));
- c->sentlen = 0;
- c->reply_bytes -= objlen;
- }
- }
- // 限制单次写入字节数,避免阻塞事件循环
- if (totwritten > NET_MAX_WRITES_PER_EVENT &&
- (server.maxmemory == 0 ||
- zmalloc_used_memory() < server.maxmemory) &&
- !(c->flags & CLIENT_SLAVE)) break;
- }
- server.stat_net_output_bytes += totwritten; // 统计输出字节数
- if (nwritten == -1) {
- // 写入错误,关闭客户端
- if (errno != EAGAIN) {
- serverLog(LL_VERBOSE,"Error writing to client: %s", strerror(errno));
- freeClient(c);
- }
- return C_ERR;
- }
- // 所有数据写完,清理资源
- if (!clientHasPendingReplies(c)) {
- c->sentlen = 0;
- if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
- // 若客户端标记了"回复后关闭",则关闭连接
- if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
- freeClient(c);
- return C_ERR;
- }
- }
- return C_OK;
- }
复制代码 简单来说,Redis 会先尝试直接将输出缓冲区的数据写入 socket,若一次写不完,则注册写事件处理器,等待操作系统通知 socket 可写时再次写入,直到所有数据发送完成。这种方式既保证了效率,又避免了阻塞事件循环。
set 命令的处理方法: setCommand
前面我们提到,processCommand 方法会从输入缓冲区解析出对应的 redisCommand,然后调用 call 方法执行该命令的 proc 方法。
不同命令的 proc 方法各不相同,比如 set 命令对应的 proc 是 setCommand 方法,get 命令对应的是 getCommand 方法。
这种通过命令找到对应处理函数的方式,和 Java 中的多态思想很相似。
call方法逻辑如下图:
call方法大致源码:- void call(client *c, int flags) {
- ....
- // 调用命令对应的处理函数(如setCommand、getCommand)
- c->cmd->proc(c);
- ....
- }
- // redisCommand结构体定义
- struct redisCommand {
- char *name; // 命令名称
- redisCommandProc *proc; // 命令处理函数
- .... // 其他属性(参数个数、标志位等)
- };
- // 命令处理函数的类型定义
- typedef void redisCommandProc(client *c);
- // 所有Redis命令的注册表(部分示例)
- struct redisCommand redisCommandTable[] = {
- {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, // get命令对应getCommand
- {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, // set命令对应setCommand
- {"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
- .... // 其他命令
- };
复制代码 setCommand 会先判断命令是否携带 nx、xx、ex、px 等可选参数,然后调用 setGenericCommand 完成核心逻辑。
我们直接来看 setGenericCommand 方法的处理流程:
setGenericCommand 方法的处理逻辑如下所示:
- 首先判断 set 的类型是 setnx 还是 setxx,如果是 nx 并且 key 已经存在则直接返回;如果是 xx 并且 key 不存在则直接返回。
- 调用 setKey 方法将键值添加到对应的 Redis 数据库中。
- 如果有过期时间,则调用 setExpire 将设置过期时间
- 进行键空间通知
- 返回对应的值给客户端。
setGenericCommand 的具体实现代码如下:- // t_string.c
- void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
- long long milliseconds = 0;
- /**
- * 处理过期时间参数:如果存在expire(如ex/px),先解析为毫秒
- */
- if (expire) {
- // 将expire(robj类型)转换为整数,存储到milliseconds
- if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
- return;
- // 过期时间不能小于等于0,否则返回错误
- if (milliseconds <= 0) {
- addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
- return;
- }
- // 如果单位是秒(UNIT_SECONDS),转换为毫秒
- if (unit == UNIT_SECONDS) milliseconds *= 1000;
- }
- /**
- * 处理NX和XX参数:
- * - NX(只在key不存在时设置):如果key已存在,返回abort_reply
- * - XX(只在key存在时设置):如果key不存在,返回abort_reply
- * lookupKeyWrite用于在数据库中查找key(写操作场景)
- */
- if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
- (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
- {
- addReply(c, abort_reply ? abort_reply : shared.nullbulk);
- return;
- }
- /**
- * 将键值对存入数据库的dict哈希表
- */
- setKey(c->db,key,val);
- server.dirty++; // 记录数据库修改次数(用于持久化和复制)
- /**
- * 如果有过期时间,将过期时间存入数据库的expires哈希表
- */
- if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
- /**
- * 发送键空间通知(如"set"事件,供订阅者感知)
- */
- notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
- if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
- "expire",key,c->db->id);
- /**
- * 向客户端返回结果(如"OK"或自定义回复)
- */
- addReply(c, ok_reply ? ok_reply : shared.ok);
- }
复制代码 简单来说,setKey 就是把键值对存入 redisDb 的 dict 哈希表,setExpire 则把键和过期时间存入 expires 哈希表,两者共同完成 set 命令的核心存储逻辑。redisDb 的 dict 哈希表如下图:
get 命令的处理方法: getCommand
。。。。。。
由于平台篇幅限制, 此处省略了 1000字,剩下的内容,请参见原文地址
原始的内容,请参考 本文 的 原文 地址
本文 的 原文 地址
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |