fukua95

Redis 源码分析

Redis 是(国内?)互联网最常用的系统之一,而且很多公司有自己的定制版。

在高性能的前提下,我觉得 Redis 的接口设计是它能从 KV 系统中脱颖而出的原因之一,Redis 支持 hash table, set, list 等数据结构,定位是一个远程数据结构,使 Redis 有更多使用场景。

类似 S3, Redis API 被称为 Redis 协议,很多 KV 系统都支持 Redis 协议。

这篇笔记梳理了 Redis 源码,希望能帮助在看 Redis 源码的人。

Util

lzf[.h/_c.c]实现 lzf 压缩算法,RDB 持久化时默认会使用 lzf 压缩 key&value。

zmalloc.[h/c] malloc wrapper, 会记录使用的内存量。

sds.[h/c]实现变长字符串类型,可以认为是简化的 std::string, 很多 buffer 的类型用 sds,分配内存空间的策略:

  1. 创建对象时,根据指定的长度申请内存
  2. 变长,剩余长度不足时,会重新申请 (len + add_len) * 2 的内存

anet.[h/c]封装socket相关的系统调用,方便使用:

  • anetTcpServer:socket->bind->listen
  • anetAccept: accept
  • anetRead: read的最多读 n bytes -> 刚好读 n bytes
  • anetWrite: write的最多写 n bytes -> 刚好写 n bytes

Async Event Library

Redis 是基于事件驱动 + 单线程处理请求的模型,本质是 IO 多路复用 + 单线程执行回调函数,代码在ae.[h/c](事件处理层), ae_epoll.c/ae_kqueue.c/ae_select.c(解耦 IO 多路复用和具体系统)。

Redis 用单线程执行主流程(处理请求),还有少量其他线程执行异步任务,比如关闭文件,回收内存,在 Redis 6.0 使用多个线程(可配置)处理 I/O.

异步事件库实现是一个数据结构aeEventLoop,维护一个 epoll 实例 + (fd, 监听状态,处理函数) 的集合。

typedef struct aeApiState {
    int epfd;  /* epoll fd, `epoll_create`返回值 */
    struct epoll_event events[AE_SETSIZE];  /* 存储`epoll_wait`结果 */
} aeApiState;

typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData; /* event loop 内部没有用到,类似 golang's context 
} aeFileEvent;

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;  /* 删除时执行 */
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

typedef struct aeEventLoop {
    int maxfd;
    long long timeEventNextId;
    aeFileEvent events[AE_SETSIZE]; /* Registered events */
    aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    /* void *apidata 指向 aeApiState 对象 */
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

AE_SETSIZE值很重要,决定了 max clients count.

aeEventLoop需要维护的 fd 数 = max clients count + 少量内部 fd (listen fd, epoll fd, log file, rdb/aof file, &c).

所以设置AE_SETSIZE = 1024 * 10(10k).

在这个 commit 改为配置值,redis.conf配置max_clients(默认值是 10000), 服务启动时设置AE_SETSIZE = max_clients + 32,支持运行时动态设置。

由于 linux 进程默认最大打开文件数是 1024,服务启动时需要修改这个数值 (使用 glibc’s setrlimit).

aeEventLoop维护2种类型的事件:

  1. 文件事件

    • (listen fd, readable, acceptHandler)
    • (client/slave fd, readable, readQueryFromClient)
    • (client/slave fd, writable, sendReplyToClient)
  2. 时间事件,其实是周期性执行的任务

    redis 把多个任务合并成一个,周期性执行:aof/rdb persistence, remove expired keys.

    这类型的事件不是由 fd 达到希望的状态后触发,本身没有对应的 fd,但是因为 Redis 希望主流程是单线程,所以把它和文件事件维护在一起,实现时,aeEventLoop单独维护一个aeTimeEvent链表。

aeEventLoop的接口也很清晰, create/add event/delete event/delete,具体如下:

/* 初始化相关数据,调用 epoll_create */
aeEventLoop *aeCreateEventLoop(void);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
/* create/delete event 实现: 维护相关数据,调用 epoll_ctl */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);

redis server 在启动时创建一个aeEventLoop对象,添加 listen fd 和一个周期性任务,初始化完成后,调用aeMain,由aeEventLoop监听 listen fd, 接收 client fd,调用对应的处理函数,不断 add/delete client fd.

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

还有一个点,我们前面一直说 epoll 实现 IO 多路复用,准确来说,是会根据操作系统选择最合适的系统调用。

/* from ae.c */

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

代码直接用宏 include “*.c”,可能是 C 特色,C++ 就不建议这样了。

实现细节建议直接看代码,代码写得很清晰。

监听

Server 初始化时调用anetTcpServer创建一个 listen fd,并把 (fd, readable, acceptHandler) 加到 event loop,初始化完成后,调用aeMain监听 fd 状态。

读写流程

读写链路有3种 IO 事件,把链路分成3个部分,由 event loop 触发执行:

  1. accept client id:接受新连接
  2. read: 读取请求 -> 解析命令 -> 执行命令 -> 记录结果
  3. write: 发送回复

新请求连接

有新连接的请求时,listen fd 可读,listen fd 的回调函数acceptHandler会被调用:

  1. anetAccept获取 client fd
  2. createClient 创建 redis client 对象
  3. if (server.maxclients && listLength(server.clients) > server.maxclients) {
      char *err = "-ERR max number of clients reached\r\n";
      if (write(c->fd,err,strlen(err)) == -1) {
        /* Nothing to do, Just to avoid the warning... */
      }
      freeClient(c);
    }
    

这里2和3,为什么不先判断 client 数,再创建 redis client?

因为 server 不想让 client 一直等待,server 想要:当 client 数超过限制,server 发送 err message 并关闭 client。

所以不论判断的结果如何,这个 redis client 都需要创建,所以先执行2,再执行3,更加合理。

redisClient维护一条连接的上下文,包括读取请求的 buffer,发送回复的 buffer,执行的命令等。

redisClient *createClient(int fd)的实现:

  1. 设置 socket 为非阻塞(O_NONBLOCK),tcp 禁用 Nagle(TCP_NODELAY)
  2. 初始化 redisClient
  3. server’s event loop 加入 (fd, readable, readQueryFromClient)

现在 client fd 进入监听状态,等待下一轮epoll_wait轮询。

epoll_wait每一轮轮询,如果 listen fd 可读,会调用一次accept获取一个 client fd,如果同时有大量 client fd 在等待accept,就需要多次轮询才能处理完,可以考虑优化:如果 listen fd 可读,调用多次accept,直到没有 client fd。

读取请求

对于 (fd, readable, readQueryFromCLient),当收到请求后,readQueryFromClient会被调用,从 socket’s buffer 读取数据到 redis client’s querybuf,注意 syscall read 的返回值:

/* Return the number read, -1 for errors or 0 for EOF. */
/* 非阻塞读,所以当 nread = -1 时,需要判断 errno 是不是 EAGAIN */
nread = read(fd, buf, REDIS_IOBUF_LEN);
if (nread == -1) {
  if (errno == EAGAIN) {
    nread = 0;
  } else {
    redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
    freeClient(c);
    return;
  }
} else if (nread == 0) {
  redisLog(REDIS_VERBOSE, "Client closed connection");
  freeClient(c);
  return;
}
if (nread) {
  c->querybuf = sdscatlen(c->querybuf, buf, nread);
  c->lastinteraction = time(NULL);
} else {
  /* 非阻塞读,读不到数据,直接返回. */
  return;
}

if (!(c->flags & REDIS_BLOCKED))
  processInputBuffer(c);

解析命令

processInputBuffer从 client’s querybuf解析命令,把解析的结果存储到 client 的argcargv参数,回收querybuf对应的内存。

因为接收的数据不一定刚好是完整命令,client 需要维护命令相关的状态,主要有:

  • querybuf存储接收的数据
  • 命令是以 RESP array 的格式,multibulk表示 ARRAY 的长度
  • bulklen表示目前接收到的 RESP string 的长度

解析细节看processInputBuffer

执行命令

processInputBuffer每次解析命令后,会调用processCommand,如果解析的结果是完整的命令,会执行命令。

有一个全局变量cmdTable记录每个命令的名字,对应的处理逻辑,元数据:

typedef void redisCommandProc(redisClient *c);
struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;  /* 负数:cmd 参数至少要 -arity 个;正数:cmd 参数必须是 arity 个. */
    int flags;
    /* 还有4个成员,和 vm 相关,具体看代码*/
};
static struct redisCommand cmdTable[] = {
    {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
    {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
    {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
    {"append",appendCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1},
    {"substr",substrCommand,4,REDIS_CMD_INLINE,NULL,1,1,1},
    {"del",delCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}
    /* 还有很多,具体看代码 */
};

processCommandcmdTable查找对应的命令,并调用cmd->proc(c)

cmd->proc(c)的实现是读写 server 维护的数据结构,比如 string kv table, hash table, set, &c.

redisClient有一个成员redisDb *db,在redisClient初始化时,把db指向 server’s db

typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blockingkeys;         /* Keys with clients waiting for data (BLPOP) */
    dict *io_keys;              /* Keys with clients waiting for VM I/O */
    int id;
} redisDb;

cmd->proc(c)把读写的结果放到 c 的成员list *reply中。

如果放之前listLength(c->reply) == 0,还需要把 (c->fd, writable, sendReplyToClient) 注册到 event loop。

发送回复

sendReplyToClientlist *reply写到 socket’s buffer,但是我们不知道 socket’s buffer 现在可写的大小,只能调用 write syscall,根据write的返回值才知道。

所以会出现一个 reply 只写入一部分的场景,client 维护变量sentlen,表示 reply 队列的第一个 reply 目前写入的长度 (写完的 reply 会删除)。

当所有 reply 都写完,需要把 (fd, writable, sendReplyToClient) 从 event loop 中移除。

Server’s DB 设计

redisServer维护多种信息,其中成员redisDb *db维护存储的数据。

redisDb *db是一个redisDb类型的数组,因为redisServer支持多个数据库,数据库个数由配置值dbnum决定,默认是16。

我觉得redisDb的设计会是:

struct redisDb {
    Dict<string, string> default;     // string kv table.
    Dict<string, Dict*> hash_tables;
    Dict<string, LinkedList *> linked_lists;
    // others
};

redis 作者的设计是:

struct redisDb {
    Dict<robj*, robj*> dict;
};

把所有 Dict 合并成一个,通过robj->type判断 value 的数据类型。

redisDb具体定义是:

typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blockingkeys;         /* Keys with clients waiting for data (BLPOP) */
    dict *io_keys;              /* Keys with clients waiting for VM I/O */
    int id;
} redisDb;

数据结构

各种数据结构的代码相对独立,直接看代码就行了。