fukua95

Redis 源码分析

Redis 是一个 in-memory data structures server, 在互联网应用非常广泛,常见的使用场景:

  • 作为其他系统的缓存,减少其他系统的负载 (特别是数据库)
  • 缓存临时数据,比如一个分布式部署的网站,流量巨大,同一用户的不同请求可能会发送到不同 Web 服务器,可以用 Redis 保存全局的 session/permission info/&c.

在高性能的前提下,我觉得 Redis 的接口设计是它能从 KV 系统中脱颖而出的关键原因,Redis 支持 hash table, set, list 等数据结构,定位是一个远程数据结构,使得它有更多的使用场景,对用户更加友好。

类似 S3, Redis API 被称为 Redis 协议,很多 KV 系统都支持 Redis 协议,比如阿里的 Tair。

这篇文章记录我阅读源码的顺序,梳理了源码,希望能帮助在看 Redis 源码的工程师。

Util

lzf[.h/_c.c]实现 lzf 压缩算法,RDB 持久化时默认会使用 lzf 压缩 key&value.

zmalloc.[h/c]是 malloc wrapper, 会记录使用的堆内存量。

sds.[h/c]实现变长字符串类型,可以认为是简化的std::string, 很多 buffer 的类型是sds,分配内存的策略:

  1. 创建对象时,根据指定的长度申请内存
  2. 变长,剩余长度不足时,会重新申请(len + add_len) * 2的内存

anet.[h/c]封装socket相关的系统调用,方便使用:

  • anetTcpServer:socket->bind->listen
  • anetAccept: accept
  • anetRead: read的最多读 n bytes -> 刚好读 n bytes
  • anetWrite: write的最多写 n bytes -> 刚好写 n bytes

Async Event Library

Redis 基于事件驱动,用单线程执行主流程(处理请求),还有少量线程执行异步任务,比如关闭文件,回收内存,在 Redis 6.0 使用多个线程(可配置)处理 I/O.

Redis 的事件驱动库本质是 IO 多路复用 + 执行回调函数,代码在ae.[h/c](事件处理层), ae_epoll.c/ae_kqueue.c/ae_select.c(解耦 IO 多路复用和具体系统调用)。

异步事件库实现是一个数据结构aeEventLoop,维护一个 epoll 实例 + (fd, 监听状态,处理函数) 的集合。

typedef struct aeApiState {
    int epfd;  /* epoll fd, `epoll_create`返回值 */
    struct epoll_event events[AE_SETSIZE];  /* 存储`epoll_wait`结果 */
} aeApiState;

typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData; /* event loop 内部没有用到,类似 golang's context 
} aeFileEvent;

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;  /* 删除时执行 */
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

typedef struct aeEventLoop {
    int maxfd;
    long long timeEventNextId;
    aeFileEvent events[AE_SETSIZE]; /* Registered events */
    aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    /* void *apidata 指向 aeApiState 对象 */
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

AE_SETSIZE值很重要,决定了 max clients count.

aeEventLoop需要维护的 fd 数 = max clients count + 少量内部 fd (listen fd, epoll fd, log file, rdb/aof file, &c),所以设置AE_SETSIZE = 1024 * 10(10k).

在这个 commit 改为配置值,redis.conf配置max_clients(默认值是 10000), 服务启动时设置AE_SETSIZE = max_clients + 32,支持运行时动态设置。

由于 linux 进程默认最大打开文件数是 1024,服务启动时需要修改这个数值 (使用 glibc’s setrlimit).

aeEventLoop维护2种类型的事件:

  1. 文件事件

    • (listen fd, readable, acceptHandler)
    • (client/slave fd, readable, readQueryFromClient)
    • (client/slave fd, writable, sendReplyToClient)
  2. 时间事件,其实是周期性执行的任务

    redis 把多个任务合并成一个,周期性执行:aof/rdb persistence, remove expired keys.

    这类型的事件不是由 fd 达到希望的状态后触发,本身没有对应的 fd,但是因为 Redis 希望主流程是单线程,所以把它和文件事件维护在一起,实现是aeEventLoop单独维护一个aeTimeEvent链表。

aeEventLoop的接口也很清晰, create/add event/delete event/delete,具体如下:

/* 初始化相关数据,调用 epoll_create */
aeEventLoop *aeCreateEventLoop(void);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
/* create/delete event 实现: 维护相关数据,调用 epoll_ctl */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);

redis server 在启动时创建一个aeEventLoop对象,添加 listen fd 和一个周期性任务,初始化完成后,调用aeMain,由aeEventLoop监听 listen fd, 接收 client fd,调用对应的处理函数,不断 add/delete client fd.

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

还有一个点,我们前面一直说 epoll 实现 IO 多路复用,准确来说,是会根据操作系统选择最合适的系统调用。

/* from ae.c */

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

代码直接用宏 include “*.c”,可能是 C 特色,C++ 就不建议这样了。

实现细节建议直接看代码,代码写得很清晰。

监听

Server 初始化时调用anetTcpServer创建一个 listen fd,并把 (fd, readable, acceptHandler) 加到 event loop,初始化完成后,调用aeMain监听 fd 状态。

读写流程

读写链路有3种 IO 事件,把链路分成3个部分,由 event loop 触发执行:

  1. accept client id:接受新连接
  2. read: 读取请求 -> 解析命令 -> 执行命令 -> 记录结果
  3. write: 发送回复

新请求连接

有新连接的请求时,listen fd 可读,listen fd 的回调函数acceptHandler会被调用:

  1. anetAccept获取 client fd
  2. createClient 创建 redis client 对象
  3. if (server.maxclients && listLength(server.clients) > server.maxclients) {
      char *err = "-ERR max number of clients reached\r\n";
      if (write(c->fd,err,strlen(err)) == -1) {
        /* Nothing to do, Just to avoid the warning... */
      }
      freeClient(c);
    }
    

这里2和3,为什么不先判断 client 数,再创建 redis client?

因为 server 不想让 client 一直等待,server 想要:当 client 数超过限制,server 返回 err message 并关闭 client。

所以不论判断的结果如何,这个 redis client 都需要创建,所以先执行2,再执行3,更加合理。

redisClient维护一条连接的上下文,包括读取请求的 buffer,发送回复的 buffer,执行的命令等。

redisClient *createClient(int fd)的实现:

  1. 设置 socket 为非阻塞(O_NONBLOCK),tcp 禁用 Nagle(TCP_NODELAY)
  2. 初始化 redisClient
  3. server’s event loop 加入 (fd, readable, readQueryFromClient)

现在 client fd 进入监听状态,等待下一轮epoll_wait轮询。

epoll_wait每一轮轮询,如果 listen fd 可读,会调用一次accept获取一个 client fd,如果同时有大量 client fd 在等待accept,需要多次轮询才能处理完,优化:如果 listen fd 可读,调用多次accept,直到没有 client fd。

读取请求

对于 (fd, readable, readQueryFromCLient),当收到请求后,readQueryFromClient会被调用,从 socket’s buffer 读取数据到 redis client’s querybuf,注意 syscall read 的返回值:

/* Return the number read, -1 for errors or 0 for EOF. */
/* 非阻塞读,所以当 nread = -1 时,需要判断 errno 是不是 EAGAIN */
nread = read(fd, buf, REDIS_IOBUF_LEN);
if (nread == -1) {
  if (errno == EAGAIN) {
    nread = 0;
  } else {
    redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
    freeClient(c);
    return;
  }
} else if (nread == 0) {
  redisLog(REDIS_VERBOSE, "Client closed connection");
  freeClient(c);
  return;
}
if (nread) {
  c->querybuf = sdscatlen(c->querybuf, buf, nread);
  c->lastinteraction = time(NULL);
} else {
  /* 非阻塞读,读不到数据,直接返回. */
  return;
}

if (!(c->flags & REDIS_BLOCKED))
  processInputBuffer(c);

解析命令

processInputBuffer从 client’s querybuf解析命令,把解析的结果存储到 client 的argcargv参数,回收querybuf对应的内存。

因为接收的数据不一定刚好是完整命令,client 需要维护命令相关的状态,主要有:

  • querybuf存储接收的数据
  • 命令是以 RESP array 的格式,multibulk表示 ARRAY 的长度
  • bulklen表示目前接收到的 RESP string 的长度

解析细节看processInputBuffer

执行命令

processInputBuffer每次解析命令后,会调用processCommand,如果解析的结果是完整的命令,会执行命令。

有一个全局变量cmdTable记录每个命令的名字,对应的处理逻辑,元数据:

typedef void redisCommandProc(redisClient *c);
struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;  /* 负数:cmd 参数至少要 -arity 个;正数:cmd 参数必须是 arity 个. */
    int flags;
    /* 还有4个成员,和 vm 相关,具体看代码*/
};
static struct redisCommand cmdTable[] = {
    {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
    {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
    {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
    {"append",appendCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1},
    {"substr",substrCommand,4,REDIS_CMD_INLINE,NULL,1,1,1},
    {"del",delCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}
    /* 还有很多,具体看代码 */
};

processCommandcmdTable查找对应的命令,并调用cmd->proc(c)

cmd->proc(c)的实现是读写 server 维护的数据结构,比如 string kv table, hash table, set, &c.

redisClient有一个成员redisDb *db,在redisClient初始化时,把db指向 server’s db

typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blockingkeys;         /* Keys with clients waiting for data (BLPOP) */
    dict *io_keys;              /* Keys with clients waiting for VM I/O */
    int id;
} redisDb;

cmd->proc(c)把读写的结果放到 c 的成员list *reply中。

如果放之前listLength(c->reply) == 0,还需要把 (c->fd, writable, sendReplyToClient) 注册到 event loop。

发送回复

sendReplyToClientlist *reply写到 socket’s buffer,但是我们不知道 socket’s buffer 现在可写的大小,只能调用 write syscall,根据write的返回值才知道。

所以会出现一个 reply 只写入一部分的场景,client 维护变量sentlen,表示 reply 队列的第一个 reply 目前写入的长度 (写完的 reply 会删除)。

当所有 reply 都写完,需要把 (fd, writable, sendReplyToClient) 从 event loop 中移除。

Server’s DB 设计

redisServer维护多种信息,其中成员redisDb *db维护存储的数据。

redisDb *db是一个redisDb类型的数组,因为redisServer支持多个数据库,数据库个数由配置值dbnum决定,默认是16。

我觉得redisDb的设计会是:

struct redisDb {
    Dict<string, string> default;     // string kv table.
    Dict<string, Dict*> hash_tables;
    Dict<string, LinkedList *> linked_lists;
    // others
};

redis 作者的设计是:

struct redisDb {
    Dict<robj*, robj*> dict;
};

把所有 Dict 合并成一个,通过robj->type判断 value 的数据类型。

redisDb具体定义是:

typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blockingkeys;         /* Keys with clients waiting for data (BLPOP) */
    dict *io_keys;              /* Keys with clients waiting for VM I/O */
    int id;
} redisDb;

数据结构

各种数据结构的代码相对独立,直接看代码就行了。