Redis 源码分析
Redis 是一个 in-memory data structures server, 在互联网应用非常广泛,常见的使用场景:
- 作为其他系统的缓存,减少其他系统的负载 (特别是数据库)
- 缓存临时数据,比如一个分布式部署的网站,流量巨大,同一用户的不同请求可能会发送到不同 Web 服务器,可以用 Redis 保存全局的 session/permission info/&c.
在高性能的前提下,我觉得 Redis 的接口设计是它能从 KV 系统中脱颖而出的关键原因,Redis 支持 hash table, set, list 等数据结构,定位是一个远程数据结构,使得它有更多的使用场景,对用户更加友好。
类似 S3, Redis API 被称为 Redis 协议,很多 KV 系统都支持 Redis 协议,比如阿里的 Tair。
这篇文章记录我阅读源码的顺序,梳理了源码,希望能帮助在看 Redis 源码的工程师。
Util
lzf[.h/_c.c]
实现 lzf 压缩算法,RDB 持久化时默认会使用 lzf 压缩 key&value.
zmalloc.[h/c]
是 malloc wrapper, 会记录使用的堆内存量。
sds.[h/c]
实现变长字符串类型,可以认为是简化的std::string
, 很多 buffer 的类型是sds
,分配内存的策略:
- 创建对象时,根据指定的长度申请内存
- 变长,剩余长度不足时,会重新申请
(len + add_len) * 2
的内存
anet.[h/c]
封装socket
相关的系统调用,方便使用:
anetTcpServer
:socket->bind->listenanetAccept
: acceptanetRead
:read
的最多读 n bytes -> 刚好读 n bytesanetWrite
:write
的最多写 n bytes -> 刚好写 n bytes
Async Event Library
Redis 基于事件驱动,用单线程执行主流程(处理请求),还有少量线程执行异步任务,比如关闭文件,回收内存,在 Redis 6.0 使用多个线程(可配置)处理 I/O.
Redis 的事件驱动库本质是 IO 多路复用 + 执行回调函数,代码在ae.[h/c]
(事件处理层), ae_epoll.c/ae_kqueue.c/ae_select.c
(解耦 IO 多路复用和具体系统调用)。
异步事件库实现是一个数据结构aeEventLoop
,维护一个 epoll 实例 + (fd, 监听状态,处理函数) 的集合。
typedef struct aeApiState {
int epfd; /* epoll fd, `epoll_create`返回值 */
struct epoll_event events[AE_SETSIZE]; /* 存储`epoll_wait`结果 */
} aeApiState;
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData; /* event loop 内部没有用到,类似 golang's context
} aeFileEvent;
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc; /* 删除时执行 */
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
typedef struct aeEventLoop {
int maxfd;
long long timeEventNextId;
aeFileEvent events[AE_SETSIZE]; /* Registered events */
aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
/* void *apidata 指向 aeApiState 对象 */
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
AE_SETSIZE
值很重要,决定了 max clients count.
aeEventLoop
需要维护的 fd 数 = max clients count + 少量内部 fd (listen fd, epoll fd, log file, rdb/aof file, &c),所以设置AE_SETSIZE = 1024 * 10
(10k).
在这个 commit 改为配置值,redis.conf
配置max_clients
(默认值是 10000), 服务启动时设置AE_SETSIZE = max_clients + 32
,支持运行时动态设置。
由于 linux 进程默认最大打开文件数是 1024,服务启动时需要修改这个数值 (使用 glibc’s setrlimit
).
aeEventLoop
维护2种类型的事件:
-
文件事件
- (listen fd, readable,
acceptHandler
) - (client/slave fd, readable,
readQueryFromClient
) - (client/slave fd, writable,
sendReplyToClient
)
- (listen fd, readable,
-
时间事件,其实是周期性执行的任务
redis 把多个任务合并成一个,周期性执行:aof/rdb persistence, remove expired keys.
这类型的事件不是由 fd 达到希望的状态后触发,本身没有对应的 fd,但是因为 Redis 希望主流程是单线程,所以把它和文件事件维护在一起,实现是
aeEventLoop
单独维护一个aeTimeEvent
链表。
aeEventLoop
的接口也很清晰, create/add event/delete event/delete,具体如下:
/* 初始化相关数据,调用 epoll_create */
aeEventLoop *aeCreateEventLoop(void);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
/* create/delete event 实现: 维护相关数据,调用 epoll_ctl */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
redis server 在启动时创建一个aeEventLoop
对象,添加 listen fd 和一个周期性任务,初始化完成后,调用aeMain
,由aeEventLoop
监听 listen fd, 接收 client fd,调用对应的处理函数,不断 add/delete client fd.
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
还有一个点,我们前面一直说 epoll 实现 IO 多路复用,准确来说,是会根据操作系统选择最合适的系统调用。
/* from ae.c */
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
代码直接用宏 include “*.c”,可能是 C 特色,C++ 就不建议这样了。
实现细节建议直接看代码,代码写得很清晰。
监听
Server 初始化时调用anetTcpServer
创建一个 listen fd,并把 (fd, readable, acceptHandler
) 加到 event loop,初始化完成后,调用aeMain
监听 fd 状态。
读写流程
读写链路有3种 IO 事件,把链路分成3个部分,由 event loop 触发执行:
- accept client id:接受新连接
- read: 读取请求 -> 解析命令 -> 执行命令 -> 记录结果
- write: 发送回复
新请求连接
有新连接的请求时,listen fd 可读,listen fd 的回调函数acceptHandler
会被调用:
anetAccept
获取 client fdcreateClient
创建 redis client 对象-
if (server.maxclients && listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } freeClient(c); }
这里2和3,为什么不先判断 client 数,再创建 redis client?
因为 server 不想让 client 一直等待,server 想要:当 client 数超过限制,server 返回 err message 并关闭 client。
所以不论判断的结果如何,这个 redis client 都需要创建,所以先执行2,再执行3,更加合理。
redisClient
维护一条连接的上下文,包括读取请求的 buffer,发送回复的 buffer,执行的命令等。
redisClient *createClient(int fd)
的实现:
- 设置 socket 为非阻塞(O_NONBLOCK),tcp 禁用 Nagle(TCP_NODELAY)
- 初始化
redisClient
- server’s event loop 加入 (fd, readable,
readQueryFromClient
)
现在 client fd 进入监听状态,等待下一轮epoll_wait
轮询。
epoll_wait
每一轮轮询,如果 listen fd 可读,会调用一次accept
获取一个 client fd,如果同时有大量 client fd 在等待accept
,需要多次轮询才能处理完,优化:如果 listen fd 可读,调用多次accept
,直到没有 client fd。
读取请求
对于 (fd, readable, readQueryFromCLient
),当收到请求后,readQueryFromClient
会被调用,从 socket’s buffer 读取数据到 redis client’s querybuf
,注意 syscall read 的返回值:
/* Return the number read, -1 for errors or 0 for EOF. */
/* 非阻塞读,所以当 nread = -1 时,需要判断 errno 是不是 EAGAIN */
nread = read(fd, buf, REDIS_IOBUF_LEN);
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
redisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
if (nread) {
c->querybuf = sdscatlen(c->querybuf, buf, nread);
c->lastinteraction = time(NULL);
} else {
/* 非阻塞读,读不到数据,直接返回. */
return;
}
if (!(c->flags & REDIS_BLOCKED))
processInputBuffer(c);
解析命令
processInputBuffer
从 client’s querybuf
解析命令,把解析的结果存储到 client 的argc
和argv
参数,回收querybuf
对应的内存。
因为接收的数据不一定刚好是完整命令,client 需要维护命令相关的状态,主要有:
querybuf
存储接收的数据- 命令是以 RESP array 的格式,
multibulk
表示 ARRAY 的长度 bulklen
表示目前接收到的 RESP string 的长度
解析细节看processInputBuffer
。
执行命令
processInputBuffer
每次解析命令后,会调用processCommand
,如果解析的结果是完整的命令,会执行命令。
有一个全局变量cmdTable
记录每个命令的名字,对应的处理逻辑,元数据:
typedef void redisCommandProc(redisClient *c);
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity; /* 负数:cmd 参数至少要 -arity 个;正数:cmd 参数必须是 arity 个. */
int flags;
/* 还有4个成员,和 vm 相关,具体看代码*/
};
static struct redisCommand cmdTable[] = {
{"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
{"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
{"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
{"append",appendCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1},
{"substr",substrCommand,4,REDIS_CMD_INLINE,NULL,1,1,1},
{"del",delCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}
/* 还有很多,具体看代码 */
};
processCommand
从cmdTable
查找对应的命令,并调用cmd->proc(c)
。
cmd->proc(c)
的实现是读写 server 维护的数据结构,比如 string kv table, hash table, set, &c.
redisClient
有一个成员redisDb *db
,在redisClient
初始化时,把db
指向 server’s db
。
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
dict *io_keys; /* Keys with clients waiting for VM I/O */
int id;
} redisDb;
cmd->proc(c)
把读写的结果放到 c 的成员list *reply
中。
如果放之前listLength(c->reply) == 0
,还需要把 (c->fd, writable, sendReplyToClient
) 注册到 event loop。
发送回复
sendReplyToClient
把list *reply
写到 socket’s buffer,但是我们不知道 socket’s buffer 现在可写的大小,只能调用 write
syscall,根据write
的返回值才知道。
所以会出现一个 reply 只写入一部分的场景,client 维护变量sentlen
,表示 reply 队列的第一个 reply 目前写入的长度 (写完的 reply 会删除)。
当所有 reply 都写完,需要把 (fd, writable, sendReplyToClient
) 从 event loop 中移除。
Server’s DB 设计
redisServer
维护多种信息,其中成员redisDb *db
维护存储的数据。
redisDb *db
是一个redisDb
类型的数组,因为redisServer
支持多个数据库,数据库个数由配置值dbnum
决定,默认是16。
我觉得redisDb
的设计会是:
struct redisDb {
Dict<string, string> default; // string kv table.
Dict<string, Dict*> hash_tables;
Dict<string, LinkedList *> linked_lists;
// others
};
redis 作者的设计是:
struct redisDb {
Dict<robj*, robj*> dict;
};
把所有 Dict 合并成一个,通过robj->type
判断 value 的数据类型。
redisDb
具体定义是:
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
dict *io_keys; /* Keys with clients waiting for VM I/O */
int id;
} redisDb;
数据结构
各种数据结构的代码相对独立,直接看代码就行了。