redis启动后,设置完必要的事件后,调用aeMain进入事件循环,之后便阻塞在aeApiPoll上,等待客户端的连接。

客户端交互

客户端与服务器的交互状态主要有四种:建立连接、发送请求、返回数据、关闭连接。

建立连接

如果有客户端进行连接,则会触发服务器监听socket的可读事件,并调用networking.c/acceptTcpHandler来处理连接:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    /* 省略 */
    while(max--) {
        /* 对accpet()封装,获取client socket的文件描述符cfd */
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        /* 省略 */

        /* 封装处理cfd */
        acceptCommonHandler(cfd,0,cip);
    }
}

服务器accept连接后,就交给acceptCommonHandler来进行后续的处理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;
    /* 创建客户端 */
    if ((c = createClient(fd)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
    /* 后续错误处理,省略 */
    server.stat_numconnections++;
    c->flags |= flags;
}

在acceptCommonHandler中,使用createClient创建了对客户端的抽象:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));

    if (fd != -1) {
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        /* 将客户端可读事件添加到server的eventLoop中 */
        if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) {
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    /* 对客户端进行初始化操作,省略 */
    return c;
}

在创建客户端的过程中,redis将客户端可读事件添加到了server的eventLoop中。这样,不仅客户端创建连接会触发事件调用acceptTcpHandler函数,同时如果客户端可读(有请求),也会触发事件调用readQueryFromClient函数。

发送请求

当客户端发送请求时,客户端连接socket会产生可读事件,此时会触发readQueryFromClient函数进行处理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;

    /* 必要初始化,省略 */

    /* 使用read()读取客户端数据 */
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            /* 没有数据,重试 */
            return;
        } else {
            /* 读取错误 */
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        /* 客户端关闭 */
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    }

    /* 容错处理,省略 */

    processInputBuffer(c);
}

readQueryFromClient事实上就是对read()的封装,将请求数据读取到client->querybuf中。读取数据后,networking.c/processInputBuffer会对读取的数据进行处理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void processInputBuffer(client *c) {
    server.current_client = c;
    /* 一直处理querybuf */
    while(sdslen(c->querybuf)) {
        /* 校验等操作,省略 */

        /* 处理分隔 */
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* 真正处理请求 */
            if (processCommand(c) == C_OK)
                resetClient(c);
            if (server.current_client == NULL) break;
        }
    }
    server.current_client = NULL;
}

处理请求调用server.c/processCommand函数,在这个函数中,对各种Redis Command进行了处理,并将处理结果写入了client->buf中。在这一步,redis并没有直接将结果写入客户端,而是在后续统一做了一次处理。

此时,readQueryFromClient事件处理函数完毕,主要做了三件事:从客户端读取请求、处理、将结果写入client->buf中。

返回数据

redis将结果从client->buf写入客户端socket,分为了两步:

  1. 未进入事件循环阻塞之前,尝试将结果数据写入客户端,如果数据未全部写入,则注册客户端可写事件到事件循环中;
  2. 如果客户端可写事件触发,调用处理函数,将剩余数据写入客户端。

第一步是由进入事件循环阻塞前的钩子函数beforeSleep实现:

1
2
3
4
5
6
void beforeSleep(struct aeEventLoop *eventLoop) {
    /* 处理redis服务器状态,省略 */

    /* 将需要发送的数据写入客户端 */
    handleClientsWithPendingWrites();
}

其中,写入的操作在函数networking.c/handleClientsWithPendingWrites中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
int handleClientsWithPendingWrites(void) {
    /* 初始化客户端链表,省略 */
    /* 对于每个客户端,写入数据 */
    while((ln = listNext(&li))) {
        /* 校验,省略 */

        /* 尝试将数据写入客户端,对write()的封装 */
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        /* 数据未完全写入 */
        if (clientHasPendingReplies(c)) {
            /* 设置flags,省略 */
            /* 注册客户端socket可写事件处理函数sendReplyToClient */
            if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) {
                freeClientAsync(c);
            }
        }
    }
    return processed;
}

可以看到,如果数据没能完全写入,则注册客户端socket可写事件,对应的处理函数是networking.c/sendReplyToClient

1
2
3
4
5
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    UNUSED(el);
    UNUSED(mask);
    writeToClient(fd,privdata,1);
}

这个处理函数很简单,就是简单的写入数据。如果这一次数据还是没有完全写入,那么在新的一次事件循环中,会重复上述过程,直到数据全部写入为止。

这种在进入事件循环阻塞前先尝试将数据写入客户端的方式,可以尽可能的减少需要检测客户端socket写事件的操作,有助于redis的性能提升。

关闭连接

当客户端socket可写,并且读入0个字节时,表示客户端已经关闭了连接,这个可以在readQueryFromClient函数中看到。当客户端关闭后,调用了freeClient函数,执行了清理动作:

1
2
3
4
5
6
7
8
9
void freeClient(client *c) {
    /* 清理动作,包括释放buf,free数据结构等,省略 */

    /* 关闭socket、删除已注册的事件等 */
    unlinkClient(c);

    /* 省略,释放客户端 */
    zfree(c);
}

最终,unlinkClient对事件循环中已经注册的可读可写事件进行了删除:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
void unlinkClient(client *c) {
    /* 省略 */
    if (c->fd != -1) {
        /* 省略 */

        /* 删除事件,关闭连接 */
        aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
        close(c->fd);
        c->fd = -1;
    }

    /* 清理操作,省略 */
}

最终,随着客户端的关闭,该客户端的事件也被删除,在事件循环中完全被清理。

总结

所以,完整的客户端事件生命周期可以概括如下:

  1. 客户端建立连接,触发redis监听socket的可读事件,在处理函数中调用createClient创建客户端抽象,并注册客户端socket的可读事件到事件循环

  2. 客户端发送请求,触发客户端socket的可读事件,调用处理函数readQueryFromClient,读取请求并处理,处理结果放入client->buf缓冲区中;

  3. 在每次的事件循环进入阻塞前,尝试将客户端缓冲区的数据发送给客户端,如果没有全部发送,则注册客户端socket可写事件到事件循环。当客户端socket可写事件触发,调用处理函数sendReplyToClient,将剩余数据写入客户端。如果仍未全部写入,重复该过程。

  4. 当客户端socket可写,并且读入0个字节时,表示客户端已经关闭了连接。此时执行清理操作freeClient,将注册的客户端socket可读/可写事件从事件循环中删除,并释放客户端。