Redis很快,而且,Redis能很好的应对高并发的场景。但是,如果对Redis有过了解,就会知道Redis程序是单线程的。使用单线程却能保证如此高性能,有以下几个原因:

  • 大部分数据放在内存中,减少磁盘I/O

  • 大多数操作都是I/O操作,cpu密集型操作异步化,防止单个操作占用大量cpu

  • 使用I/O多路复用以及事件驱动的网络模型,提高吞吐量

其中,I/O多路复用又封装了epoll、kqueue等高性能的底层系统接口,在不同平台都能获得高性能的保证。

事件驱动库

Redis使用了单线程的Reactor模型来处理事件。简单来说,连接建立、请求、回复、断开等都是事件,当连接发生上述的事件时,就触发相对应的事件处理函数handler来对这个事件进行回应。

Redis自己简单实现了一个事件驱动库,而没有使用开源的libevent和libev,主要是为了保证代码的简洁性,同时不需要引入外部依赖。我们使用 redis-3.2.12 的源代码来分析Redis的事件驱动模型。

Redis的事件驱动库由ae.hae.c实现,通过以下的宏实现在不同的平台下选择性能最好的系统事件函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

ae_epoll.cae_evport.cae_kqueue.cae_select.c这几个文件中,都分别用统一签名的api方法封装了不同系统的底层事件函数。Redis的事件库调用封装过的api方法,这样就不用关心具体的底层系统实现:

1
2
3
4
5
6
7
/* 统一签名api */
int aeApiCreate(aeEventLoop *eventLoop);
int aeApiResize(aeEventLoop *eventLoop, int setsize);
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
char *aeApiName(void);

Redis事件

在Redis事件库中,封装了两类事件:

  • 文件事件:aeFileEvent,对socket文件描述符动作的抽象,当socket文件描述符可读/可写时触发对应的文件事件

  • 时间事件:aeTimeEvent,对定时动作的抽象,指定时间到达后触发时间事件

文件事件

文件事件的结构体如下:

1
2
3
4
5
6
7
8
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

一般,当文件描述符可读(AE_READABLE),会触发调用rfileProc函数,当文件描述符可写(AE_WRITABLE),触发调用wfileProc函数。当两种事件同时产生,会优先处理读事件,然后再处理写事件(顺序可自定义控制)。

时间事件

定时事件分两种,一种是一次性定时事件,还有一种就是周期性定时事件。时间事件的结构体如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

到达定时时间后,会触发时间事件,然后调用timeProc函数。如果该处理函数返回AE_NOMORE,那么就是一个一次性事件,否则就是周期性事件,会在下一个定时期到达后再次调用timeProc。

Redis的时间事件都存放在一个链表中,通过next指针相连。执行时间事件处理函数processTimeEvents时,会遍历整个链表,查找已经到达的时间事件,并调用该事件的timeProc函数。

事件循环

Redis的事件都存放在一个事件循环(aeEventLoop)中,启动事件循环来对触发的事件进行处理,处理完毕后继续进入循环,等待下一个事件的触发处理。事件循环的结构体如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

事件的主循环函数如下:

1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

只要aeEventLoop没有停止,就会在循环中先调用钩子函数beforesleep,然后进入事件执行函数aeProcessEvents处理响应的事件,处理完成后重复此动作。

aeProcessEvents的主要动作如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    /* 省略 */
    struct timeval tv, *tvp;
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        /* 省略 */
        /* 计算距离当前时间最近的时间事件 */
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        /* 计算tvp,即poll操作的超时时间,代码省略 */
        /* 如果最近的时间事件已经到达,则设tvp为0,否则设置为剩余时间 */
        /* 如果没有定时事件,tvp设置为NULL,直接阻塞poll操作 */

        /* poll操作,阻塞,直到超时或者有文件事件触发 */
        numevents = aeApiPoll(eventLoop, tvp);

        /* 处理文件事件 */
        for (j = 0; j < numevents; j++) {
            /* 省略 */
            /* 处理读事件,可能放到写事件后处理,由invert控制 */
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
            /* 处理写事件 */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
             /* 省略 */
            processed++;
        }
    }
    /* 处理时间事件 */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed;
}

所以,事件循环在执行aeApiPoll后阻塞,当有对应的文件事件触发后,先处理对应的文件事件,然后再处理时间事件链表,触发已经到达的时间事件。

总结

Redis自己实现的事件驱动库非常精简,封装了文件事件和时间事件,非常值得学习。有了事件驱动库后,就可以开始构建高性能的服务器程序了。后续会对Redis的服务器程序进行分析,看看Redis如何利用事件驱动库实现高性能以及高并发。