redis启动后,设置完必要的事件后,调用aeMain进入事件循环,之后便阻塞在aeApiPoll上,等待客户端的连接。
客户端交互
客户端与服务器的交互状态主要有四种:建立连接、发送请求、返回数据、关闭连接。
建立连接
如果有客户端进行连接,则会触发服务器监听socket的可读事件,并调用networking.c/acceptTcpHandler
来处理连接:
1
2
3
4
5
6
7
8
9
10
11
|
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
/* 省略 */
while(max--) {
/* 对accpet()封装,获取client socket的文件描述符cfd */
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
/* 省略 */
/* 封装处理cfd */
acceptCommonHandler(cfd,0,cip);
}
}
|
服务器accept连接后,就交给acceptCommonHandler来进行后续的处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
/* 创建客户端 */
if ((c = createClient(fd)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
/* 后续错误处理,省略 */
server.stat_numconnections++;
c->flags |= flags;
}
|
在acceptCommonHandler中,使用createClient创建了对客户端的抽象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
/* 将客户端可读事件添加到server的eventLoop中 */
if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) {
close(fd);
zfree(c);
return NULL;
}
}
/* 对客户端进行初始化操作,省略 */
return c;
}
|
在创建客户端的过程中,redis将客户端可读事件添加到了server的eventLoop中。这样,不仅客户端创建连接会触发事件调用acceptTcpHandler函数,同时如果客户端可读(有请求),也会触发事件调用readQueryFromClient函数。
发送请求
当客户端发送请求时,客户端连接socket会产生可读事件,此时会触发readQueryFromClient函数进行处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
/* 必要初始化,省略 */
/* 使用read()读取客户端数据 */
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
/* 没有数据,重试 */
return;
} else {
/* 读取错误 */
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
/* 客户端关闭 */
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
/* 容错处理,省略 */
processInputBuffer(c);
}
|
readQueryFromClient事实上就是对read()的封装,将请求数据读取到client->querybuf中。读取数据后,networking.c/processInputBuffer
会对读取的数据进行处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
void processInputBuffer(client *c) {
server.current_client = c;
/* 一直处理querybuf */
while(sdslen(c->querybuf)) {
/* 校验等操作,省略 */
/* 处理分隔 */
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
if (c->argc == 0) {
resetClient(c);
} else {
/* 真正处理请求 */
if (processCommand(c) == C_OK)
resetClient(c);
if (server.current_client == NULL) break;
}
}
server.current_client = NULL;
}
|
处理请求调用server.c/processCommand
函数,在这个函数中,对各种Redis Command进行了处理,并将处理结果写入了client->buf中。在这一步,redis并没有直接将结果写入客户端,而是在后续统一做了一次处理。
此时,readQueryFromClient事件处理函数完毕,主要做了三件事:从客户端读取请求、处理、将结果写入client->buf中。
返回数据
redis将结果从client->buf写入客户端socket,分为了两步:
- 未进入事件循环阻塞之前,尝试将结果数据写入客户端,如果数据未全部写入,则注册客户端可写事件到事件循环中;
- 如果客户端可写事件触发,调用处理函数,将剩余数据写入客户端。
第一步是由进入事件循环阻塞前的钩子函数beforeSleep实现:
1
2
3
4
5
6
|
void beforeSleep(struct aeEventLoop *eventLoop) {
/* 处理redis服务器状态,省略 */
/* 将需要发送的数据写入客户端 */
handleClientsWithPendingWrites();
}
|
其中,写入的操作在函数networking.c/handleClientsWithPendingWrites
中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
int handleClientsWithPendingWrites(void) {
/* 初始化客户端链表,省略 */
/* 对于每个客户端,写入数据 */
while((ln = listNext(&li))) {
/* 校验,省略 */
/* 尝试将数据写入客户端,对write()的封装 */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/* 数据未完全写入 */
if (clientHasPendingReplies(c)) {
/* 设置flags,省略 */
/* 注册客户端socket可写事件处理函数sendReplyToClient */
if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) {
freeClientAsync(c);
}
}
}
return processed;
}
|
可以看到,如果数据没能完全写入,则注册客户端socket可写事件,对应的处理函数是networking.c/sendReplyToClient
:
1
2
3
4
5
|
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(mask);
writeToClient(fd,privdata,1);
}
|
这个处理函数很简单,就是简单的写入数据。如果这一次数据还是没有完全写入,那么在新的一次事件循环中,会重复上述过程,直到数据全部写入为止。
这种在进入事件循环阻塞前先尝试将数据写入客户端的方式,可以尽可能的减少需要检测客户端socket写事件的操作,有助于redis的性能提升。
关闭连接
当客户端socket可写,并且读入0个字节时,表示客户端已经关闭了连接,这个可以在readQueryFromClient函数中看到。当客户端关闭后,调用了freeClient函数,执行了清理动作:
1
2
3
4
5
6
7
8
9
|
void freeClient(client *c) {
/* 清理动作,包括释放buf,free数据结构等,省略 */
/* 关闭socket、删除已注册的事件等 */
unlinkClient(c);
/* 省略,释放客户端 */
zfree(c);
}
|
最终,unlinkClient对事件循环中已经注册的可读可写事件进行了删除:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
void unlinkClient(client *c) {
/* 省略 */
if (c->fd != -1) {
/* 省略 */
/* 删除事件,关闭连接 */
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
close(c->fd);
c->fd = -1;
}
/* 清理操作,省略 */
}
|
最终,随着客户端的关闭,该客户端的事件也被删除,在事件循环中完全被清理。
总结
所以,完整的客户端事件生命周期可以概括如下:
客户端建立连接,触发redis监听socket的可读事件,在处理函数中调用createClient创建客户端抽象,并注册客户端socket的可读事件到事件循环
客户端发送请求,触发客户端socket的可读事件,调用处理函数readQueryFromClient,读取请求并处理,处理结果放入client->buf缓冲区中;
在每次的事件循环进入阻塞前,尝试将客户端缓冲区的数据发送给客户端,如果没有全部发送,则注册客户端socket可写事件到事件循环。当客户端socket可写事件触发,调用处理函数sendReplyToClient,将剩余数据写入客户端。如果仍未全部写入,重复该过程。
当客户端socket可写,并且读入0个字节时,表示客户端已经关闭了连接。此时执行清理操作freeClient,将注册的客户端socket可读/可写事件从事件循环中删除,并释放客户端。