有了高性能的事件驱动库之后,就可以开始构造Redis服务器程序了。一般,Server启动都会通过以下流程,然后阻塞在accept()上,等待客户端的连接:
bind() ———> listen() ———> accept()
Redis也不例外,但是因为Redis是单线程,如果阻塞在accept()上,就不能对客户端进行快速的回复,所以整个监听流程还是有所不同。
启动函数
Redis的启动函数在server.c/main
中。在main函数里,除了一些必要的命令行解析、进程daemon化、资源初始化等操作,主要做了两件事:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
/* 全局server变量 */
struct redisServer server;
int main(int argc, char **argv) {
/* 命令行解析、初始化资源等操作,省略 */
/* 1. 初始化Server */
initServer();
/* 省略 */
/* 2. 设置事件循环钩子函数beforeSleep,并启动事件循环 */
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
/* 事件循环退出,退出服务器 */
aeDeleteEventLoop(server.el);
return 0;
}
|
初始化Server
初始化Server的逻辑在server.c/initServer
中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
void initServer(void) {
/* 设置server相关属性,代码省略 */
/* 创建eventLoop */
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
/* listen(),监听端口 */
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
/* 创建Redis数据库,初始化资源,代码省略 */
/* 添加时间事件serverCron,主要用于执行后台命令 */
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create the serverCron time event.");
exit(1);
}
/* 添加文件事件acceptTcpHandler,当listen socket accept连接时触发 */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
/* 后续处理,代码省略 */
}
|
在initServer
中,创建了全局server的eventLoop,并添加了两个事件:一个时间事件serverCron,一个文件事件acceptTcpHandler。
serverCron
一般模式下,redis只有server.c/serverCron
一个时间事件,这个定期时间事件主要工作是:
- 清理已经过期的键以及数据
- 更新服务器的统计状态
- 增量式rehash数据库哈希表
- BGSAVE、AOF操作,处理终止的子进程
- 关闭超时客户端、集群模式重连
- ……
可以通过调整server.hz选项来修改serverCron执行频率,redis会定期执行这个函数,每秒server.hz次。
acceptTcpHandler
redis通过下面代码增加了一个文件事件:
1
|
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL)
|
如果有客户端发起连接,服务器监听socket的文件描述符就会产生AE_READABLE事件,相对应的处理函数是networking.c/acceptTcpHandler
:
1
2
3
4
5
6
7
8
9
10
11
|
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
/* 省略 */
while(max--) {
/* 对accpet()封装,获取client socket的文件描述符cfd */
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
/* 省略 */
/* 封装处理cfd */
acceptCommonHandler(cfd,0,cip);
}
}
|
通过这种监听socket可读的事件触发机制,单线程的redis就不用阻塞在accept()上,而是每次有客户端发起连接再进行相应的处理。
启动eventLoop
redis在进入事件循环阻塞前设置了一个钩子函数server.c/beforeSleep
,这个函数在每次事件循环进入事件阻塞前调用:
1
2
3
4
5
6
|
void beforeSleep(struct aeEventLoop *eventLoop) {
/* 处理redis服务器状态,省略 */
/* 将需要发送的数据写入客户端 */
handleClientsWithPendingWrites();
}
|
这个函数主要处理了redis服务器的一些状态,包括处理键过期、主从状态、将AOF缓冲写入磁盘等,最后将需要发送的数据写入客户端。最后这个写入操作是必要的,可以尽可能的减少需要进行写事件触发的操作。
然后调用aeMain进入事件循环,整个redis服务器就开始不断地进行事件的处理,直到退出。
总结
可以用一张图来概括redis server启动过程中的事件相关流程:

redis在initServer中已经设置了accept事件和定时器事件,然后调用aeMain进入事件循环。每次有accept事件触发,都会调用对应的acceptTcpHandler来进行处理。如果没有事件触发(accept事件和客户端可读/可写事件),则阻塞在aeApiPoll,直到因为定时器的到达而退出阻塞,并执行定时器事件函数,然后重新进入事件循环阻塞。