有了高性能的事件驱动库之后,就可以开始构造Redis服务器程序了。一般,Server启动都会通过以下流程,然后阻塞在accept()上,等待客户端的连接:

bind() ———> listen() ———> accept()

Redis也不例外,但是因为Redis是单线程,如果阻塞在accept()上,就不能对客户端进行快速的回复,所以整个监听流程还是有所不同。

启动函数

Redis的启动函数在server.c/main中。在main函数里,除了一些必要的命令行解析、进程daemon化、资源初始化等操作,主要做了两件事:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
/* 全局server变量 */
struct redisServer server;

int main(int argc, char **argv) {
    /* 命令行解析、初始化资源等操作,省略 */

    /* 1. 初始化Server */
    initServer();

    /* 省略 */

    /* 2. 设置事件循环钩子函数beforeSleep,并启动事件循环 */
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeMain(server.el);

    /* 事件循环退出,退出服务器 */
    aeDeleteEventLoop(server.el);
    return 0;
}

初始化Server

初始化Server的逻辑在server.c/initServer中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void initServer(void) {
    /* 设置server相关属性,代码省略 */

    /* 创建eventLoop */
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

    /* listen(),监听端口 */
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);

    /* 创建Redis数据库,初始化资源,代码省略 */

    /* 添加时间事件serverCron,主要用于执行后台命令 */
    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create the serverCron time event.");
        exit(1);
    }

    /* 添加文件事件acceptTcpHandler,当listen socket accept连接时触发 */
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }

    /* 后续处理,代码省略 */
}

initServer中,创建了全局server的eventLoop,并添加了两个事件:一个时间事件serverCron,一个文件事件acceptTcpHandler。

serverCron

一般模式下,redis只有server.c/serverCron一个时间事件,这个定期时间事件主要工作是:

  • 清理已经过期的键以及数据
  • 更新服务器的统计状态
  • 增量式rehash数据库哈希表
  • BGSAVE、AOF操作,处理终止的子进程
  • 关闭超时客户端、集群模式重连
  • ……

可以通过调整server.hz选项来修改serverCron执行频率,redis会定期执行这个函数,每秒server.hz次。

acceptTcpHandler

redis通过下面代码增加了一个文件事件:

1
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL)

如果有客户端发起连接,服务器监听socket的文件描述符就会产生AE_READABLE事件,相对应的处理函数是networking.c/acceptTcpHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    /* 省略 */
    while(max--) {
        /* 对accpet()封装,获取client socket的文件描述符cfd */
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        /* 省略 */

        /* 封装处理cfd */
        acceptCommonHandler(cfd,0,cip);
    }
}

通过这种监听socket可读的事件触发机制,单线程的redis就不用阻塞在accept()上,而是每次有客户端发起连接再进行相应的处理。

启动eventLoop

redis在进入事件循环阻塞前设置了一个钩子函数server.c/beforeSleep,这个函数在每次事件循环进入事件阻塞前调用:

1
2
3
4
5
6
void beforeSleep(struct aeEventLoop *eventLoop) {
    /* 处理redis服务器状态,省略 */

    /* 将需要发送的数据写入客户端 */
    handleClientsWithPendingWrites();
}

这个函数主要处理了redis服务器的一些状态,包括处理键过期、主从状态、将AOF缓冲写入磁盘等,最后将需要发送的数据写入客户端。最后这个写入操作是必要的,可以尽可能的减少需要进行写事件触发的操作。

然后调用aeMain进入事件循环,整个redis服务器就开始不断地进行事件的处理,直到退出。

总结

可以用一张图来概括redis server启动过程中的事件相关流程:

oauth2

redis在initServer中已经设置了accept事件和定时器事件,然后调用aeMain进入事件循环。每次有accept事件触发,都会调用对应的acceptTcpHandler来进行处理。如果没有事件触发(accept事件和客户端可读/可写事件),则阻塞在aeApiPoll,直到因为定时器的到达而退出阻塞,并执行定时器事件函数,然后重新进入事件循环阻塞。