Redis很快,而且,Redis能很好的应对高并发的场景。但是,如果对Redis有过了解,就会知道Redis程序是单线程的。使用单线程却能保证如此高性能,有以下几个原因:
其中,I/O多路复用又封装了epoll、kqueue等高性能的底层系统接口,在不同平台都能获得高性能的保证。
事件驱动库
Redis使用了单线程的Reactor模型来处理事件。简单来说,连接建立、请求、回复、断开等都是事件,当连接发生上述的事件时,就触发相对应的事件处理函数handler来对这个事件进行回应。
Redis自己简单实现了一个事件驱动库,而没有使用开源的libevent和libev,主要是为了保证代码的简洁性,同时不需要引入外部依赖。我们使用
redis-3.2.12
的源代码来分析Redis的事件驱动模型。
Redis的事件驱动库由ae.h
、ae.c
实现,通过以下的宏实现在不同的平台下选择性能最好的系统事件函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
|
在ae_epoll.c
、ae_evport.c
、ae_kqueue.c
、ae_select.c
这几个文件中,都分别用统一签名的api方法封装了不同系统的底层事件函数。Redis的事件库调用封装过的api方法,这样就不用关心具体的底层系统实现:
1
2
3
4
5
6
7
|
/* 统一签名api */
int aeApiCreate(aeEventLoop *eventLoop);
int aeApiResize(aeEventLoop *eventLoop, int setsize);
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
char *aeApiName(void);
|
Redis事件
在Redis事件库中,封装了两类事件:
文件事件
文件事件的结构体如下:
1
2
3
4
5
6
7
8
|
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
|
一般,当文件描述符可读(AE_READABLE),会触发调用rfileProc函数,当文件描述符可写(AE_WRITABLE),触发调用wfileProc函数。当两种事件同时产生,会优先处理读事件,然后再处理写事件(顺序可自定义控制)。
时间事件
定时事件分两种,一种是一次性定时事件,还有一种就是周期性定时事件。时间事件的结构体如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
|
到达定时时间后,会触发时间事件,然后调用timeProc函数。如果该处理函数返回AE_NOMORE,那么就是一个一次性事件,否则就是周期性事件,会在下一个定时期到达后再次调用timeProc。
Redis的时间事件都存放在一个链表中,通过next指针相连。执行时间事件处理函数processTimeEvents时,会遍历整个链表,查找已经到达的时间事件,并调用该事件的timeProc函数。
事件循环
Redis的事件都存放在一个事件循环(aeEventLoop)中,启动事件循环来对触发的事件进行处理,处理完毕后继续进入循环,等待下一个事件的触发处理。事件循环的结构体如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
|
事件的主循环函数如下:
1
2
3
4
5
6
7
8
|
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
|
只要aeEventLoop没有停止,就会在循环中先调用钩子函数beforesleep,然后进入事件执行函数aeProcessEvents处理响应的事件,处理完成后重复此动作。
aeProcessEvents的主要动作如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
/* 省略 */
struct timeval tv, *tvp;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
/* 省略 */
/* 计算距离当前时间最近的时间事件 */
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
/* 计算tvp,即poll操作的超时时间,代码省略 */
/* 如果最近的时间事件已经到达,则设tvp为0,否则设置为剩余时间 */
/* 如果没有定时事件,tvp设置为NULL,直接阻塞poll操作 */
/* poll操作,阻塞,直到超时或者有文件事件触发 */
numevents = aeApiPoll(eventLoop, tvp);
/* 处理文件事件 */
for (j = 0; j < numevents; j++) {
/* 省略 */
/* 处理读事件,可能放到写事件后处理,由invert控制 */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* 处理写事件 */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* 省略 */
processed++;
}
}
/* 处理时间事件 */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
}
|
所以,事件循环在执行aeApiPoll后阻塞,当有对应的文件事件触发后,先处理对应的文件事件,然后再处理时间事件链表,触发已经到达的时间事件。
总结
Redis自己实现的事件驱动库非常精简,封装了文件事件和时间事件,非常值得学习。有了事件驱动库后,就可以开始构建高性能的服务器程序了。后续会对Redis的服务器程序进行分析,看看Redis如何利用事件驱动库实现高性能以及高并发。