找回密码
 立即注册
首页 业界区 安全 Redis 命令执行过程分析,彻底解析 Redis 底层原理(图 ...

Redis 命令执行过程分析,彻底解析 Redis 底层原理(图解+秒懂+史上最全)

伯绮梦 7 小时前
本文 的 原文 地址

原始的内容,请参考 本文 的 原文 地址

本文 的 原文 地址
本文作者:

  • 第一作者  老架构师 肖恩(肖恩  是尼恩团队 高级架构师,负责写此文的第一稿,初稿 )
  • 第二作者   老架构师 尼恩   (45岁老架构师, 负责  提升此文的 技术高度,让大家有一种  俯视 技术、俯瞰技术、 技术自由  的感觉
Redis 命令执行过程分析

一条Redis命令的执行流程,本质是"连接建立→处理阶段→返回结果"3个环节 串联,每个环节都由事件机制驱动。
处理阶段 包含2个 子阶段:读取命令→  解析执行
理解这个过程,能帮助 大家  深入地掌握Redis的性能优化(比如避免内存溢出)、故障排查(比如 慢查询排查、耗时命令定位排查)等实践技能。
Redis命令执行的三大阶段

一条命令从执行到返回数据,主要涉及三个阶段,具体如下:

  • 第一阶段:建立连接阶段
    完成 socket 连接建立,并创建 client 对象;
  • 第二阶段:处理阶段
    从 socket 读取数据到输入缓冲区,解析获得命令后执行,再将返回值存入输出缓冲区;
  • 第三阶段:数据返回阶段
    将输出缓冲区的返回值写入 socket 并返回给客户端,最后关闭 client。
1.png

这三个阶段通过事件机制串联。
Redis 启动时会先注册 socket 连接建立事件处理器:

  • 客户端请求建立 socket 连接时,触发对应处理器,完成连接建立后注册 socket 读取事件处理器;
  • 客户端发送命令时,读取事件处理器被触发,执行处理阶段逻辑后注册 socket 写事件处理器;
  • 写事件处理器被触发时,将返回值写回 socket。
2.png

Redis命令执行的三个阶段,通过 反应器模式  事件机制  驱动和串联,
接下来的内容 非常重要, 就是: 反应器模式  事件机制
45岁老架构师尼恩提示: 这个是面试的核心重点。
反应器模式(Reactor Pattern)事件机制

反应器模式是一种事件驱动(Event-Driven)的设计模式
核心目标是高效处理多并发 I/O 操作
其通过一个 "反应器"(Reactor)组件统一管理事件的注册、监听和分发,将 I/O 事件(如连接建立、数据可读 / 可写)分发给对应的处理器(Handler)处理,避免传统多线程模型中线程上下文切换的开销,提升系统吞吐量。
Reactor Pattern 核心组件


  • 事件(Event):触发处理的信号,如 "连接建立"、"数据可读"、"定时任务触发" 等。
  • 反应器(Reactor):事件的 "总调度中心",负责监听事件(如 I/O 事件、定时事件),并将事件分发给对应的处理器。
  • 事件多路分发器(Event Multiplexer):底层依赖操作系统的 I/O 多路复用机制(如 select、poll、epoll、kqueue),同时监听多个 I/O 句柄(如 socket),当事件触发时通知反应器。
  • 事件处理器(Handler):负责具体的事件处理逻辑(如读取数据、解析请求、发送响应),与反应器解耦。
Redis 单线程 反应器实现

Redis  通过单线程高效处理大量客户端的网络请求和定时任务。
Redis 的反应器模式以事件循环(Event Loop) 为核心,单线程循环处理两类事件:文件事件(网络 I/O 事件)时间事件(定时任务)
(1)Redis  事件循环流程(非常简单)

伪代码如下:
while (1) {

  • 时间事件距离计算: 计算当前距离下一个时间事件的阻塞时间(避免无意义等待);
  • IO事件阻塞等待: 调用I/O多路复用器,阻塞等待IO事件(超时时间为步骤1的结果);
  • 处理IO事件:处理所有已就绪的IO事件,按事件类型分发给对应 handler
  • 处理时间事件:处理所有已到期的时间事件;
}
更加细致的Redis  事件循环流程 循环:
调用 epoll_wait 等待事件 → 返回就绪事件 → 按事件类型分发给对应 handler。
Redis   三大事件类型,以及对应的处理器如下:

  • ACCEPT(新链接事件) → acceptTcpHandler → 建立连接并注册 READ 事件;
  • READ (io读就绪事件)→ readQueryFromClient → 读取-解析-执行命令 → 准备响应 → 注册 WRITE 事件;
  • WRITE (io写就绪事件)→ sendReplyToClient → 发送完取消写事件。
(2)关键组件实现


  • 事件多路分发器
    Redis 会根据操作系统自动选择最优的 I/O 多路复用器(如 Linux 用 epoll,macOS 用 kqueue),封装为统一的aeApi接口,负责监听 socket 的读 / 写事件。
  • IO事件处理器:按照IO事件类型,调用对应的处理器,完成事件处理。
  • 时间事件处理器:处理定时任务(如过期键清理、AOF 日志刷盘),通过链表存储时间事件,每次事件循环检查并执行已到期的任务。
Redis   的IO事件 (/文件事件)按事件类型分为三类处理器 (handler):

  • 连接应答处理器:acceptTcpHandler  , 处理客户端的新连接请求(对应 socket 的 "可读" 事件,触发accept);
  • 命令请求处理器:readQueryFromClient , 处理客户端发送的命令(读取数据、解析命令);
  • 命令回复处理器:sendReplyToClient , 将命令执行结果返回给客户端(处理 socket 的 "可写" 事件)。
(3)特点


  • 单线程模型网络I/O + 命令执行 + 响应返回,  整个事件循环由一个线程执行,避免锁竞争,内存操作原子性天然保障,简化设计;
  • 事件优先级:文件事件优先于时间事件(时间事件仅在文件事件处理完毕后执行);
  • 高效性:通过 I/O 多路复用器批量处理就绪事件,单线程即可支撑数万并发连接。
关于单线程版本的  反应器实现 ,请参见尼恩的《Java高并发核心编程卷1》 ,此书对单线程版本的  反应器有通俗易懂的系统化介绍。
Netty 多线程 反应器实现

Netty 是基于 Java NIO 的高性能网络框架,其反应器模式采用多线程架构,通过 "主从 Reactor" 模型实现连接处理与 I/O 读写的分离,支撑更高的并发场景。
Netty 的反应器模式以 EventLoopGroup(事件循环组) 为核心,通过分工明确的多线程处理网络事件。
(1)主从 Reactor 模型


  • Boss Group(主 Reactor): 负责监听客户端的连接请求(处理OP_ACCEPT事件),建立连接后将 socketChannel 注册到 Worker Group。
  • Worker Group(从 Reactor): 负责处理已建立连接的 I/O 事件(OP_READ/OP_WRITE),并通过 ChannelPipeline 中的处理器(Handler)处理业务逻辑。
(2)关键组件实现


  • EventLoopGroup(事件循环组) : 包含一个或者多个 EventLoop
  • EventLoop:每个 EventLoop 对应一个线程,循环执行事件处理(类似 Redis 的事件循环),包含一个 Selector(Java NIO 的 I/O 多路复用器),负责监听注册到其上的 Channel 的 I/O 事件。
  • ChannelPipeline:事件处理的 "责任链",包含多个 ChannelHandler,负责解析数据(如编解码)、处理业务逻辑(如请求响应),支持动态添加 / 移除处理器,灵活性极高。
  • 事件分发:当 Selector 检测到 I/O 事件后,EventLoop 会将事件封装为 ChannelEvent,按顺序在 ChannelPipeline 中传递,由对应的 Handler 处理。
(3)Netty事件循环流程(比redis 复杂得多)

Boss 循环监听 OP_ACCEPT事件  → accept → 注册 Channel 到 Worker → Worker 循环监听 OP_READ/OP_WRITE →  Pipeline 顺序触发 Handler → 读写完成。
这里比redis 复杂得多, 具体请参见尼恩 讲的 Netty源码视频。
(3)特点


  • 多线程模型:通过 Boss/Worker 分组,分离连接建立与 I/O 处理,充分利用多核 CPU;
  • 异步非阻塞:所有 I/O 操作均为异步,通过 Future/Promise 机制处理结果,避免线程阻塞;
  • 高度可扩展:ChannelPipeline 支持灵活的处理器组合,适配不同协议(如 HTTP、TCP、WebSocket)。
关于多线程版本的  反应器实现 ,请参见尼恩的《Java高并发核心编程卷1》 ,此书对多线程版本的  反应器有通俗易懂的系统化介绍。
区别:Redis  是一个小号的 Netty

反应器模式的核心是 "事件驱动 + I/O 多路复用",但 Redis 和 Netty 因场景不同选择了差异化实现:

  • Redis 以单线程简化设计,聚焦高效处理内存型请求;
  • Netty 以多线程主从模型提升并发能力,支撑复杂网络通信。
两者均通过反应器模式最大化了 I/O 处理效率,是各自领域高性能的关键设计。
维度Redis(单线程 Reactor)Netty(主从多线程 Reactor)线程数1 主线程完成 accept+read+write+cmdBoss(1)+Worker(N)+可选业务线程池Reactor 数量单 Reactor主 Reactor + 多 Sub-Reactor并发瓶颈单核 CPU,长命令阻塞Worker 可水平扩展,长任务可下沉业务线程池编程模型纯 C 函数回调,无 Handler 链Pipeline + ChannelHandler 责任链锁机制无锁Worker 之间无锁,Handler 共享数据需同步适用场景极高 QPS 且命令极快(缓存)通用网络框架,长/短连接、大文件、RPC 等Redis 用极简单线程 Reactor换取极致低延迟;Netty 用主从多线程 Reactor换取高并发与可扩展性。
redis,是一个小号的 netty。
接下来,我们逐一分析Redis  反应器的 各个步骤的具体原理和代码实现。
Redis启动时监听socket

Redis 服务器启动时会调用 initServer 方法,主要完成三件事:

  • 建立事件机制 eventLoop、
  • 注册周期时间事件处理器(如serverCron),用于定期执行后台操作:清理过期键、统计信息更新等
  • 注册监听 socket 的IO事件处理器(监听连接建立事件,处理函数为 acceptTcpHandler)。
3.png
  1. void initServer(void) { // server.c
  2.     ....
  3.     /**
  4.      * 创建Redis的事件循环器eventLoop,用于管理所有事件(连接、读写等)
  5.      */
  6.     server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
  7.     /* 打开用于接收用户命令的TCP监听socket */
  8.     if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
  9.         exit(1);
  10.     /**
  11.      * 注册周期时间事件处理器(如serverCron)
  12.      * 用于定期执行后台操作:清理过期键、统计信息更新等
  13.      */
  14.     if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
  15.         serverPanic("Can't create event loop timers.");
  16.         exit(1);
  17.     }
  18.     /**
  19.      * 为所有监听的socket注册文件事件处理器
  20.      * 监听"可读事件"(客户端发起连接请求时触发)
  21.      * 绑定处理函数acceptTcpHandler,负责建立连接
  22.      */
  23.     for (j = 0; j < server.ipfd_count; j++) {
  24.         if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
  25.             acceptTcpHandler,NULL) == AE_ERR)
  26.             {
  27.                 serverPanic(
  28.                     "Unrecoverable error creating server.ipfd file event.");
  29.             }
  30.     }
  31.     ....
  32. }
复制代码
Redis 的命令执行全程依赖 initServer 中创建的 aeEventLoop 事件循环器。
当 socket 发生对应事件(如连接请求、数据到达)时,aeEventLoop 会自动进行事件分发,调用预先注册的处理器。
4.png

第一阶段:建立连接阶段

当客户端向 Redis 建立 socket时,aeEventLoop 会把事件分发到  acceptTcpHandler函数 处理。
acceptTcpHandler  为每个链接创建一个 Client 对象,并创建相应IO事件来监听socket的可读事件,并指定事件处理函数。
acceptTcpHandler 函数会首先调用 anetTcpAccept方法,它底层会调用 socket 的 accept 方法,也就是接受客户端来的建立连接请求,然后调用 acceptCommonHandler方法, 创建 client 对象,并注册 socket 读事件处理器。
5.png

连接处理核心代码
  1. // 客户端建立连接时的事件处理函数(networking.c)
  2. void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
  3.     ....
  4.     // 接受客户端连接(底层调用socket的accept)
  5.     // anetTcpAccept最终在anet.c的anetGenericAccept中调用accept
  6.     cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
  7.     if (cfd == ANET_ERR) {
  8.         if (errno != EWOULDBLOCK)
  9.             serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr);
  10.         return;
  11.     }
  12.     serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
  13.     /**
  14.      * 处理连接建立后的逻辑:创建client、检查连接数等
  15.      */
  16.     acceptCommonHandler(cfd,0,cip);
  17. }
  18. // 连接建立后的通用处理(networking.c)
  19. static void acceptCommonHandler(int fd, int flags, char *ip) {
  20.     client *c;
  21.     // 创建client对象(代表一个客户端连接)
  22.     c = createClient(fd);
  23.     // 检查是否超过最大客户端连接数(配置文件中的maxclients)
  24.     if (listLength(server.clients) > server.maxclients) {
  25.         char *err = "-ERR max number of clients reached\r\n";
  26.         if (write(c->fd,err,strlen(err)) == -1) {
  27.             // 写入失败不处理,后续会关闭连接
  28.         }
  29.         server.stat_rejected_conn++;
  30.         freeClient(c); // 释放client并关闭连接
  31.         return;
  32.     }
  33.     .... // 处理无密码时的默认保护状态
  34.     // 更新连接数统计
  35.     server.stat_numconnections++;
  36.     c->flags |= flags;
  37. }
复制代码
client对象创建

createClient 方法会初始化 client 的属性(如输入/输出缓冲区),配置 socket 为非阻塞模式,设置 NODELAY 和 SOKEEPALIVE标志位来关闭 Nagle 算法并且启动 socket 存活检查机制,并注册读事件处理器(当客户端发送数据时触发)。
  1. client *createClient(int fd) {
  2.     client *c = zmalloc(sizeof(client));
  3.     // fd为-1时用于特殊场景(如执行lua脚本)
  4.     if (fd != -1) {
  5.         // 配置socket:非阻塞模式(避免IO阻塞)、关闭Nagle算法(减少延迟)、开启保活机制
  6.         anetNonBlock(NULL,fd);
  7.         anetEnableTcpNoDelay(NULL,fd);
  8.         if (server.tcpkeepalive)
  9.             anetKeepAlive(NULL,fd,server.tcpkeepalive);
  10.         /**
  11.          * 向eventLoop注册读事件处理器
  12.          * 当客户端通过socket发送数据时,调用readQueryFromClient读取数据
  13.          */
  14.         if (aeCreateFileEvent(server.el,fd,AE_READABLE,
  15.             readQueryFromClient, c) == AE_ERR)
  16.         {
  17.             close(fd);
  18.             zfree(c);
  19.             return NULL;
  20.         }
  21.     }
  22.     // 初始化客户端默认选中的数据库(默认第0个)
  23.     selectDb(c,0);
  24.     uint64_t client_id;
  25.     atomicGetIncr(server.next_client_id,client_id,1);
  26.     c->id = client_id;
  27.     c->fd = fd;
  28.     .... // 初始化其他属性(输入缓冲区querybuf、输出缓冲区等)
  29.     return c;
  30. }
复制代码
client 对象包含输入缓冲区(存储客户端发送的数据)和输出缓冲区(存储Redis的响应数据),按类型分为

  • 普通客户端、
  • 从客户端(主从复制用)、
  • 订阅客户端(发布订阅用),
不同类型的缓冲区大小配置不同。
6.png

第二阶段:命令处理阶段

读取socket数据到输入缓冲区

客户端发送命令后,socket 触发读事件TriggerRead ,这个事件的CallRead  readQueryFromClient 会将数据读入 client 的输入缓冲区 querybuf,并检查缓冲区大小是否超过限制。

  • 若为普通客户端,直接处理缓冲区数据;
  • 若为主从复制中的主客户端,还需同步命令到从节点。
7.png
  1. // 读取客户端发送的数据到输入缓冲区(networking.c)
  2. void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
  3.     client *c = (client*) privdata;
  4.     ....
  5.     // 为输入缓冲区预留空间
  6.     c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
  7.     // 从socket读取数据到querybuf(输入缓冲区)
  8.     nread = read(fd, c->querybuf+qblen, readlen);
  9.     if (nread == -1) {
  10.         .... // 读取错误,释放client
  11.     } else if (nread == 0) {
  12.         // 客户端主动关闭连接
  13.         serverLog(LL_VERBOSE, "Client closed connection");
  14.         freeClient(c);
  15.         return;
  16.     } else if (c->flags & CLIENT_MASTER) {
  17.         /* 若client是主从复制中的主节点,将数据存入pending_querybuf用于同步到从节点 */
  18.         c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread);
  19.     }
  20.     // 更新输入缓冲区的长度
  21.     sdsIncrLen(c->querybuf,nread);
  22.     c->lastinteraction = server.unixtime;
  23.     if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
  24.     server.stat_net_input_bytes += nread;
  25.     // 检查输入缓冲区是否超过配置的最大限制(client-query-buffer-limit)
  26.     if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
  27.         sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
  28.         bytes = sdscatrepr(bytes,c->querybuf,64);
  29.         serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
  30.         sdsfree(ci);
  31.         sdsfree(bytes);
  32.         freeClient(c); // 关闭客户端连接
  33.         return;
  34.     }
  35.     if (!(c->flags & CLIENT_MASTER)) {
  36.         // 处理普通客户端的输入缓冲区
  37.         processInputBuffer(c);
  38.     } else {
  39.         // 处理主从复制中的主节点客户端
  40.         size_t prev_offset = c->reploff;
  41.         processInputBuffer(c);
  42.         // 若同步偏移量变化,通知从节点更新
  43.         size_t applied = c->reploff - prev_offset;
  44.         if (applied) {
  45.             replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied);
  46.             sdsrange(c->pending_querybuf,applied,-1);
  47.         }
  48.     }
  49. }
复制代码
解析获取命令

processInputBuffer 会解析输入缓冲区的数据,根据命令格式调用不同方法解析,最终得到命令参数 argv 和参数个数 argc,再调用 processCommand 执行命令。
执行成功后,如果是主从客户端,还需要更新同步偏移量 reploff 属性
8.png

命令格式 主要有:

  • 单行命令 PROTO_REQ_INLINE
  • 批量命令 PROTO_REQ_MULTIBULK
  1. void processInputBuffer(client *c) { // networking.c
  2.     server.current_client = c;
  3.     /* 循环处理输入缓冲区中的所有数据 */
  4.     while(sdslen(c->querybuf)) {
  5.         .... // 处理client的状态(如是否阻塞、是否在事务中)
  6.         /* 判断命令格式类型(telnet和redis-cli发送的命令格式不同) */
  7.         if (!c->reqtype) {
  8.             if (c->querybuf[0] == '*') {
  9.                 c->reqtype = PROTO_REQ_MULTIBULK; // 批量命令格式(如*2\r\n$3\r\nSET\r\n$5\r\nhello\r\n)
  10.             } else {
  11.                 c->reqtype = PROTO_REQ_INLINE; // 单行命令格式(如SET hello world)
  12.             }
  13.         }
  14.         /**
  15.          * 解析输入缓冲区数据,得到命令参数argv和参数个数argc
  16.          */
  17.         if (c->reqtype == PROTO_REQ_INLINE) {
  18.             if (processInlineBuffer(c) != C_OK) break; // 解析失败则退出循环
  19.         } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
  20.             if (processMultibulkBuffer(c) != C_OK) break;
  21.         } else {
  22.             serverPanic("Unknown request type");
  23.         }
  24.         /* 参数个数为0时,重置client以接收下一条命令 */
  25.         if (c->argc == 0) {
  26.             resetClient(c);
  27.         } else {
  28.             // 执行解析得到的命令
  29.             if (processCommand(c) == C_OK) {
  30.                 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
  31.                     // 若为从节点的主客户端,更新同步偏移量
  32.                     c->reploff = c->read_reploff - sdslen(c->querybuf);
  33.                 }
  34.                 // 非阻塞状态下,重置client以接收新命令
  35.                 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
  36.                     resetClient(c);
  37.             }
  38.         }
  39.     }
  40.     server.current_client = NULL;
  41. }
复制代码
执行命令 processCommand方法

processCommand 是命令执行的核心逻辑,主要分为三步:

  • 首先是调用 lookupCommand 方法获得对应的 redisCommand;
  • 接着是检测当前 Redis 是否可以执行该命令;
  • 最后是调用 call 方法真正执行命令。
9.png

(1) 如果命令名称为 quit,则直接返回,并且设置客户端标志位。
(2) 根据 argv[0] 查找对应的 redisCommand,所有的命令都存储在命令字典 redisCommandTable 中,根据命令名称可以获取对应的命令。
(3) 进行用户权限校验。
(4) 如果是集群模式,处理集群重定向。当命令发送者是 master 或者 命令没有任何 key 的参数时可以不重定向。
(5) 预防 maxmemory 情况,先尝试回收一下,如果不行,则返回异常。
(6) 当此服务器是 master 时:aof 持久化失败时,或上一次 bgsave 执行错误,且配置 bgsave 参数和 stopwritesonbgsaveerr;禁止执行写命令。
(7) 当此服务器时master时:如果配置了 replminslavestowrite,当slave数目小于时,禁止执行写命令。
(8) 当时只读slave时,除了 master 的不接受其他写命令。
(9) 当客户端正在订阅频道时,只会执行部分命令。
(10) 服务器为slave,但是没有连接 master 时,只会执行带有 CMD_STALE 标志的命令,如 info 等
(11) 正在加载数据库时,只会执行带有 CMD_LOADING 标志的命令,其余都会被拒绝。
(12) 当服务器因为执行lua脚本阻塞时,只会执行部分命令,其余都会拒绝
(13) 如果是事务命令,则开启事务,命令进入等待队列;否则调用call直接执行命令。
  1. int processCommand(client *c) {
  2.     // 1. 处理QUIT命令(返回OK并标记关闭)
  3.     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
  4.         addReply(c,shared.ok);
  5.         c->flags |= CLIENT_CLOSE_AFTER_REPLY;
  6.         return C_ERR;
  7.     }
  8.     /**
  9.      * 2. 根据命令名称查找对应的redisCommand结构体
  10.      * 所有命令存储在redisCommandTable字典中(如{"get":getCommand, "set":setCommand})
  11.      */
  12.     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
  13.     if (!c->cmd) {
  14.         // 处理未知命令(返回错误)
  15.         addReplyErrorFormat(c,"unknown command `%s`", (char*)c->argv[0]->ptr);
  16.         return C_OK;
  17.     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) {
  18.         // 检查参数个数是否匹配(arity为命令定义的参数要求)
  19.         addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name);
  20.         return C_OK;
  21.     }
  22.     // 3. 检查用户认证(若配置requirepass且未认证,仅允许AUTH命令)
  23.     if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
  24.     {
  25.         flagTransaction(c);
  26.         addReply(c,shared.noautherr);
  27.         return C_OK;
  28.     }
  29.     /**
  30.      * 4. 集群模式下处理命令重定向
  31.      * 若命令需在其他节点执行,返回重定向信息(如-MOVED)
  32.      */
  33.     if (server.cluster_enabled &&
  34.         !(c->flags & CLIENT_MASTER) &&
  35.         !(c->flags & CLIENT_LUA &&
  36.           server.lua_caller->flags & CLIENT_MASTER) &&
  37.         !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
  38.           c->cmd->proc != execCommand))
  39.     {
  40.         int hashslot;
  41.         int error_code;
  42.         clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
  43.                                         &hashslot,&error_code);
  44.         if (n == NULL || n != server.cluster->myself) {
  45.             if (c->cmd->proc == execCommand) {
  46.                 discardTransaction(c);
  47.             } else {
  48.                 flagTransaction(c);
  49.             }
  50.             clusterRedirectClient(c,n,hashslot,error_code);
  51.             return C_OK;
  52.         }
  53.     }
  54.     // 5. 处理maxmemory限制(尝试回收内存,失败则返回错误)
  55.     if (server.maxmemory) {
  56.         int retval = freeMemoryIfNeeded();
  57.         if (retval == C_ERR) {
  58.             addReply(c, shared.oomerr);
  59.             return C_OK;
  60.         }
  61.     }
  62.     /**
  63.      * 6. 主节点特殊检查:
  64.      * - AOF持久化失败或bgsave错误时,禁止写命令
  65.      * - 从节点数量不足时(repl-min-slaves-to-write),禁止写命令
  66.      */
  67.     if (((server.stop_writes_on_bgsave_err &&
  68.           server.saveparamslen > 0 &&
  69.           server.lastbgsave_status == C_ERR) ||
  70.           server.aof_last_write_status == C_ERR) &&
  71.         server.masterhost == NULL &&
  72.         (c->cmd->flags & CMD_WRITE ||
  73.          c->cmd->proc == pingCommand)) {
  74.         addReplySds(c, sdscatprintf(sdsempty(),
  75.             "-MISCONF Errors writing to the AOF file: %s\r\n",
  76.             server.aof_last_write_errstr ? server.aof_last_write_errstr : "unknown error"));
  77.         return C_OK;
  78.     }
  79.     /**
  80.      * 7. 从节点特殊检查:
  81.      * - 只读从节点拒绝非主节点的写命令
  82.      * - 未连接主节点时,仅允许带CMD_STALE标志的命令(如INFO)
  83.      */
  84.     if (server.masterhost && server.repl_slave_ro &&
  85.         !(c->flags & CLIENT_MASTER) &&
  86.         c->cmd->flags & CMD_WRITE) {
  87.         addReply(c,shared.readonlyerr);
  88.         return C_OK;
  89.     }
  90.     /**
  91.      * 8. 其他检查:
  92.      * - 订阅状态的客户端仅允许特定命令(PING、SUBSCRIBE等)
  93.      * - 加载数据库时仅允许带CMD_LOADING标志的命令
  94.      * - Lua脚本阻塞时仅允许特定命令(AUTH、SHUTDOWN等)
  95.      */
  96.     if (c->flags & CLIENT_PUBSUB &&
  97.         c->cmd->proc != pingCommand &&
  98.         c->cmd->proc != subscribeCommand &&
  99.         c->cmd->proc != unsubscribeCommand &&
  100.         c->cmd->proc != psubscribeCommand &&
  101.         c->cmd->proc != punsubscribeCommand) {
  102.         addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
  103.         return C_OK;
  104.     }
  105.     /**
  106.      * 9. 执行命令:
  107.      * - 事务中(CLIENT_MULTI)的命令入队等待EXEC
  108.      * - 非事务命令直接调用call执行
  109.      */
  110.     if (c->flags & CLIENT_MULTI &&
  111.         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
  112.         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
  113.     {
  114.         // 事务命令入队
  115.         queueMultiCommand(c);
  116.         addReply(c,shared.queued);
  117.     } else {
  118.         // 非事务命令直接执行
  119.         call(c,CMD_CALL_FULL);
  120.         c->woff = server.master_repl_offset;
  121.         if (listLength(server.ready_keys))
  122.             handleClientsBlockedOnLists();
  123.     }
  124.     return C_OK;
  125. }
  126. // 所有Redis命令的定义(部分示例)
  127. struct redisCommand redisCommandTable[] = {
  128.     {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
  129.     {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
  130.     {"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
  131.     .... // 其他命令
  132. };
复制代码
call方法

call 方法负责实际执行命令,包括通知监视器、调用命令处理函数、记录慢查询日志、命令传播(同步到AOF和从节点)等。
10.png


  • 如果有监视器 monitor,则需要将命令发送给监视器。
  • 调用 redisCommand 的proc 方法,执行对应具体的命令逻辑。
  • 如果开启了 CMDCALLSLOWLOG,则需要记录慢查询日志
  • 如果开启了 CMDCALLSTATS,则需要记录一些统计信息
  • 如果开启了 CMDCALLPROPAGATE,则当 dirty大于0时,需要调用 propagate 方法来进行命令传播。
命令传播就是将命令写入 repl-backlog-buffer 缓冲中,并发送给各个从服务器中。
11.png

call方法关键源码:
  1. // 执行命令的具体实现
  2. void call(client *c, int flags) {
  3.     /**
  4.      * dirty:记录数据库修改次数
  5.      * start/duration:记录命令执行的开始时间和耗时(微秒)
  6.      */
  7.     long long dirty, start, duration;
  8.     int client_old_flags = c->flags;
  9.     /**
  10.      * 若有监视器(monitor),将命令发送给监视器
  11.      * 排除从AOF加载的命令和特定管理员命令
  12.      */
  13.     if (listLength(server.monitors) &&
  14.         !server.loading &&
  15.         !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
  16.     {
  17.         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
  18.     }
  19.     ....
  20.     /* 执行命令 */
  21.     dirty = server.dirty;
  22.     start = ustime();
  23.     // 调用命令的处理函数(如setCommand、getCommand)
  24.     c->cmd->proc(c);
  25.     duration = ustime()-start;
  26.     dirty = server.dirty-dirty;
  27.     if (dirty < 0) dirty = 0;
  28.     .... // Lua脚本的特殊处理
  29.     /**
  30.      * 记录慢查询日志(若命令耗时超过slowlog-log-slower-than)
  31.      */
  32.     if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
  33.         char *latency_event = (c->cmd->flags & CMD_FAST) ?
  34.                               "fast-command" : "command";
  35.         latencyAddSampleIfNeeded(latency_event,duration/1000);
  36.         slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
  37.     }
  38.     /**
  39.      * 更新命令统计信息
  40.      */
  41.     if (flags & CMD_CALL_STATS) {
  42.         c->lastcmd->microseconds += duration;
  43.         c->lastcmd->calls++;
  44.     }
  45.     /**
  46.      * 命令传播:若修改了数据库(dirty>0),同步到AOF和从节点
  47.      */
  48.     if (flags & CMD_CALL_PROPAGATE &&
  49.         (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
  50.     {
  51.         int propagate_flags = PROPAGATE_NONE;
  52.         /**
  53.          * dirty大于0时,需要广播命令给slave和aof
  54.          */        
  55.         if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
  56.         ....
  57.         /**
  58.          * 广播命令,写如aof,发送命令到slave
  59.          * 也就是传说中的传播命令
  60.          */            
  61.         if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
  62.             propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
  63.     }
  64.     ....
  65. }
复制代码
命令传播会将命令写入 repl-backlog-buffer 缓冲区,并同步到所有从节点,保证主从数据一致性;同时写入AOF文件(若开启),实现持久化。
第三阶段:数据返回阶段

命令执行完成后,结果会被存入Client的输出缓冲区(buf或reply链表)。
当输出缓冲区有数据时,Redis会注册"写事件处理器",将结果通过socket写回客户端。
命令结果写入输出缓冲区

命令执行完成后都会通过 addReply 方法将结果写入输出缓冲区,等待返回给客户端。addReply 的核心逻辑是:准备写入环境,然后将结果写入输出缓冲区(固定缓冲区或链表)。
12.png
  1. void addReply(client *c, robj *obj) {
  2.     // 准备客户端写入环境(判断是否需要返回结果、加入等待队列等)
  3.     if (prepareClientToWrite(c) != C_OK) return;
  4.     if (sdsEncodedObject(obj)) {
  5.         // 若对象是SDS编码,先尝试写入固定缓冲区
  6.         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
  7.             // 缓冲区满了,写入响应链表
  8.             _addReplyObjectToList(c,obj);
  9.     } else if (obj->encoding == OBJ_ENCODING_INT) {
  10.         // 整数编码的特殊优化(直接转换为字符串写入)
  11.         char buf[32];
  12.         size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
  13.         if (_addReplyToBuffer(c,buf,len) != C_OK)
  14.             _addReplyObjectToList(c,obj);
  15.     } else {
  16.         serverPanic("Wrong obj->encoding in addReply()");
  17.     }
  18. }
复制代码
prepareClientToWrite 方法主要负责判断客户端是否需要返回结果,并将客户端加入等待写入队列(clients_pending_write):
  1. int prepareClientToWrite(client *c) {
  2.     // Lua脚本或模块的客户端,直接允许写入
  3.     if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
  4.     // 客户端要求不返回结果(如REPLY OFF),直接拒绝
  5.     if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
  6.     // 主从复制中的主节点客户端,不需要返回结果
  7.     if ((c->flags & CLIENT_MASTER) &&
  8.         !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
  9.     // AOF加载时的临时客户端,不需要返回结果
  10.     if (c->fd <= 0) return C_ERR;
  11.     // 若客户端不在等待写入队列,加入队列
  12.     if (!clientHasPendingReplies(c) &&
  13.         !(c->flags & CLIENT_PENDING_WRITE) &&
  14.         (c->replstate == REPL_STATE_NONE ||
  15.          (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
  16.     {
  17.         c->flags |= CLIENT_PENDING_WRITE;
  18.         listAddNodeHead(server.clients_pending_write,c);
  19.     }
  20.     return C_OK;
  21. }
复制代码
Redis 的输出缓冲区由两部分组成:

  • 固定缓冲区(buf):小数据直接写入,速度快;
  • 响应链表(reply):大数据或缓冲区满时,以节点形式存入链表。
这种设计的好处是:平衡性能和内存效率,小数据用缓冲区减少分配开销,大数据用链表避免缓冲区溢出。
13.png

命令返回值从输出缓冲区写入 socket

输出缓冲区中的数据不会立即写入 socket,而是等待 Redis 事件循环处理。具体来说,事件循环每次执行前会调用 beforeSleep 方法,该方法会通过 handleClientsWithPendingWrites 处理等待写入队列中的客户端。
14.png

下面的 aeMain 方法就是 Redis 事件循环的主逻辑,可以看到每次循环时都会调用 beforesleep 方法。
  1. void aeMain(aeEventLoop *eventLoop) { // ae.c
  2.     eventLoop->stop = 0;
  3.     while (!eventLoop->stop) {
  4.         /* 如果有需要在事件处理前执行的函数,那么执行它 */
  5.         if (eventLoop->beforesleep != NULL)
  6.             eventLoop->beforesleep(eventLoop);
  7.         /* 开始处理事件*/
  8.         aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
  9.     }
  10. }
复制代码
beforeSleep 函数会调用 handleClientsWithPendingWrites 函数来处理 clientspendingwrite 列表。
  1. // 处理等待写入队列中的客户端
  2. int handleClientsWithPendingWrites(void) {
  3.     listIter li;
  4.     listNode *ln;
  5.     int processed = listLength(server.clients_pending_write);  // 统计待处理客户端数
  6.     listRewind(server.clients_pending_write,&li);
  7.    
  8.     while((ln = listNext(&li))) {
  9.         client *c = listNodeValue(ln);
  10.         c->flags &= ~CLIENT_PENDING_WRITE;  // 移除等待标记
  11.         listDelNode(server.clients_pending_write,ln);  // 从队列中移除
  12.         
  13.         // 尝试将输出缓冲区数据写入socket
  14.         if (writeToClient(c->fd,c,0) == C_ERR) continue;
  15.         
  16.         // 若数据未写完,注册写事件处理器等待下次写入
  17.         if (clientHasPendingReplies(c)) {
  18.             int ae_flags = AE_WRITABLE;
  19.             // AOF实时同步时需要设置屏障确保顺序
  20.             if (server.aof_state == AOF_ON &&
  21.                 server.aof_fsync == AOF_FSYNC_ALWAYS)
  22.             {
  23.                 ae_flags |= AE_BARRIER;
  24.             }
  25.             // 注册写事件处理器sendReplyToClient
  26.             if (aeCreateFileEvent(server.el, c->fd, ae_flags,
  27.                 sendReplyToClient, c) == AE_ERR)
  28.             {
  29.                     freeClientAsync(c);
  30.             }
  31.         }
  32.     }
  33.     return processed;
  34. }
  35. // 实际写入socket的方法
  36. int writeToClient(int fd, client *c, int handler_installed) {
  37.     ssize_t nwritten = 0, totwritten = 0;
  38.     size_t objlen;
  39.     sds o;
  40.     // 循环写入所有待返回数据
  41.     while(clientHasPendingReplies(c)) {
  42.         if (c->bufpos > 0) {
  43.             // 先写固定缓冲区中的数据
  44.             nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
  45.             if (nwritten <= 0) break;  // 写入失败或需重试
  46.             c->sentlen += nwritten;
  47.             totwritten += nwritten;
  48.             // 缓冲区数据写完,重置缓冲区
  49.             if ((int)c->sentlen == c->bufpos) {
  50.                 c->bufpos = 0;
  51.                 c->sentlen = 0;
  52.             }
  53.         } else {
  54.             // 缓冲区为空,从响应链表取数据
  55.             o = listNodeValue(listFirst(c->reply));
  56.             objlen = sdslen(o);
  57.             if (objlen == 0) {
  58.                 listDelNode(c->reply,listFirst(c->reply));
  59.                 continue;
  60.             }
  61.             // 写链表中的数据
  62.             nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
  63.             if (nwritten <= 0) break;
  64.             c->sentlen += nwritten;
  65.             totwritten += nwritten;
  66.             // 链表节点数据写完,移除节点
  67.             if (c->sentlen == objlen) {
  68.                 listDelNode(c->reply,listFirst(c->reply));
  69.                 c->sentlen = 0;
  70.                 c->reply_bytes -= objlen;
  71.             }
  72.         }
  73.         // 限制单次写入字节数,避免阻塞事件循环
  74.         if (totwritten > NET_MAX_WRITES_PER_EVENT &&
  75.             (server.maxmemory == 0 ||
  76.              zmalloc_used_memory() < server.maxmemory) &&
  77.             !(c->flags & CLIENT_SLAVE)) break;
  78.     }
  79.     server.stat_net_output_bytes += totwritten;  // 统计输出字节数
  80.     if (nwritten == -1) {
  81.         // 写入错误,关闭客户端
  82.         if (errno != EAGAIN) {
  83.             serverLog(LL_VERBOSE,"Error writing to client: %s", strerror(errno));
  84.             freeClient(c);
  85.         }
  86.         return C_ERR;
  87.     }
  88.     // 所有数据写完,清理资源
  89.     if (!clientHasPendingReplies(c)) {
  90.         c->sentlen = 0;
  91.         if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
  92.         // 若客户端标记了"回复后关闭",则关闭连接
  93.         if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
  94.             freeClient(c);
  95.             return C_ERR;
  96.         }
  97.     }
  98.     return C_OK;
  99. }
复制代码
简单来说,Redis 会先尝试直接将输出缓冲区的数据写入 socket,若一次写不完,则注册写事件处理器,等待操作系统通知 socket 可写时再次写入,直到所有数据发送完成。这种方式既保证了效率,又避免了阻塞事件循环。
set 命令的处理方法: setCommand

前面我们提到,processCommand 方法会从输入缓冲区解析出对应的 redisCommand,然后调用 call 方法执行该命令的 proc 方法。
不同命令的 proc 方法各不相同,比如 set 命令对应的 proc 是 setCommand 方法,get 命令对应的是 getCommand 方法。
这种通过命令找到对应处理函数的方式,和 Java 中的多态思想很相似。
call方法逻辑如下图:
15.png

call方法大致源码:
  1. void call(client *c, int flags) {
  2.     ....
  3.     // 调用命令对应的处理函数(如setCommand、getCommand)
  4.     c->cmd->proc(c);
  5.     ....
  6. }
  7. // redisCommand结构体定义
  8. struct redisCommand {
  9.     char *name;  // 命令名称
  10.     redisCommandProc *proc;  // 命令处理函数
  11.     .... // 其他属性(参数个数、标志位等)
  12. };
  13. // 命令处理函数的类型定义
  14. typedef void redisCommandProc(client *c);
  15. // 所有Redis命令的注册表(部分示例)
  16. struct redisCommand redisCommandTable[] = {
  17.     {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},  // get命令对应getCommand
  18.     {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},  // set命令对应setCommand
  19.     {"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
  20.     .... // 其他命令
  21. };
复制代码
setCommand 会先判断命令是否携带 nx、xx、ex、px 等可选参数,然后调用 setGenericCommand 完成核心逻辑。
我们直接来看 setGenericCommand 方法的处理流程:
16.png

setGenericCommand 方法的处理逻辑如下所示:

  • 首先判断 set 的类型是 setnx 还是 setxx,如果是 nx 并且 key 已经存在则直接返回;如果是 xx 并且 key 不存在则直接返回。
  • 调用 setKey 方法将键值添加到对应的 Redis 数据库中。
  • 如果有过期时间,则调用 setExpire 将设置过期时间
  • 进行键空间通知
  • 返回对应的值给客户端。
setGenericCommand 的具体实现代码如下:
  1. // t_string.c
  2. void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
  3.     long long milliseconds = 0;
  4.     /**
  5.      * 处理过期时间参数:如果存在expire(如ex/px),先解析为毫秒
  6.      */
  7.     if (expire) {
  8.         // 将expire(robj类型)转换为整数,存储到milliseconds
  9.         if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
  10.             return;
  11.         // 过期时间不能小于等于0,否则返回错误
  12.         if (milliseconds <= 0) {
  13.             addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
  14.             return;
  15.         }
  16.         // 如果单位是秒(UNIT_SECONDS),转换为毫秒
  17.         if (unit == UNIT_SECONDS) milliseconds *= 1000;
  18.     }
  19.     /**
  20.      * 处理NX和XX参数:
  21.      * - NX(只在key不存在时设置):如果key已存在,返回abort_reply
  22.      * - XX(只在key存在时设置):如果key不存在,返回abort_reply
  23.      * lookupKeyWrite用于在数据库中查找key(写操作场景)
  24.      */
  25.     if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
  26.         (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
  27.     {
  28.         addReply(c, abort_reply ? abort_reply : shared.nullbulk);
  29.         return;
  30.     }
  31.     /**
  32.      * 将键值对存入数据库的dict哈希表
  33.      */
  34.     setKey(c->db,key,val);
  35.     server.dirty++;  // 记录数据库修改次数(用于持久化和复制)
  36.     /**
  37.      * 如果有过期时间,将过期时间存入数据库的expires哈希表
  38.      */
  39.     if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
  40.     /**
  41.      * 发送键空间通知(如"set"事件,供订阅者感知)
  42.      */
  43.     notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
  44.     if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
  45.         "expire",key,c->db->id);
  46.     /**
  47.      * 向客户端返回结果(如"OK"或自定义回复)
  48.      */
  49.     addReply(c, ok_reply ? ok_reply : shared.ok);
  50. }
复制代码
简单来说,setKey 就是把键值对存入 redisDb 的 dict 哈希表,setExpire 则把键和过期时间存入 expires 哈希表,两者共同完成 set 命令的核心存储逻辑。redisDb 的 dict 哈希表如下图:
17.png

get 命令的处理方法: getCommand

。。。。。。
由于平台篇幅限制, 此处省略了 1000字,剩下的内容,请参见原文地址
原始的内容,请参考 本文 的 原文 地址
本文 的 原文 地址

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册