视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
如何接受客户端请求并调用处理函数
2020-11-09 09:45:15 责编:小采
文档

上篇概括了redis的启动流程,这篇重点介绍redis如何接受客户端请求并调用处理函数来执行命令。 在上一篇里,说到了在initServer()这个函数里边,会调用anetTcpServer和anetUnixServer 这两个函数创建对tcp端口和unix域套接字的监听,那么这里首先重点分析下

上篇概括了redis的启动流程,这篇重点介绍redis如何接受客户端请求并调用处理函数来执行命令。

在上一篇里,美国服务器,说到了在initServer()这个函数里边,会调用anetTcpServer和anetUnixServer 这两个函数创建对tcp端口和unix域套接字的监听,那么这里首先重点分析下这两个函数的具体实现。

int anetTcpServer(char *err, int port, char *bindaddr) { int s; struct sockaddr_in sa; if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR) return ANET_ERR; memset(&sa,0,sizeof(sa)); sa.sin_family = AF_INET; sa.sin_port = htons(port); sa.sin_addr.s_addr = htonl(INADDR_ANY); if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) { anetSetError(err, ); close(s); return ANET_ERR; } if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR) return ANET_ERR; return s; }

从代码中我们可以看出,首先调用anetCreateSocket()创建一个套接字并复值给s, 然后对sa 这个sockaddr_in类型的结构体进行初始化, 设置要监听的端口,地址,和地址族, 再调用anetListen() 函数绑定地址并监听端口,这些工作完成后 anetCreateSocket函数返回,网站空间,并将创建的套接字复制给server.ipfd。注意在anetUnixServer()这个函数中完成的工 作类似于anetCreateSocket,只不过是绑定的unix socket。

接下来, 在initServer函数里, 调用了这个函数:aeCreateFileEvent, 这里重点分析第一个,第二个类似。

if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) oom(); if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) oom();

首先,从eventLoop的event这个aeFileEvent数组里,取出当前fd对应的acFileEvent,主要是为了在下边给它设置对应事件的处理函数;即根据传入的mask来判断是哪一类事件。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= AE_SETSIZE) return AE_ERR; aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }

然后调用 acApiEvent这个事件注册函数:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee; /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.u = ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; }

可以看到, 这个函数中,主要是调用epoll_ctl(state->epfd,op,fd,&ee)将当前fd设置到及其所关心的事件注册到epoll_create 返回的epoll的句柄里。由于我们这里注册的是AE_READABLE事件,所以当这个fd(即redis监听端口的套接字)有数据可读时(这里我理解的是客户端连接到达),网站空间,就会触发相应的事件处理函数,这里的事件处理函数便是acceptTcpHandler。


下面我们看看acceptTcpHandler这个函数:

这个函数里边首先调用 anetTcpAccept获取客户端与redis连接的socket fd(实际调用 ::accept(s,sa,len)函数返回的fd), 然后调用 acceptCommonHandler()函数,这个函数中调用 createClient()创建 redisClient *c实例, 如果当前redis服务器的连接总数没有超过最大值,则将全局变量server中记录连接数的stat_numconnections加1;如果超过了, 则想客户端输出错误信息,并释放redisClient实例,函数返回。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd; char cip[128]; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); cfd = anetTcpAccept(server.neterr, fd, cip, &cport); if (cfd == AE_ERR) { redisLog(REDIS_WARNING,, server.neterr); return; } redisLog(REDIS_VERBOSE,, cip, cport); acceptCommonHandler(cfd); }

下面分析createClient这个函数:

redisClient *createClient(int fd) { redisClient *c = zmalloc(sizeof(redisClient)); c->bufpos = 0; anetNonBlock(NULL,fd); anetTcpNoDelay(NULL,fd); if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } selectDb(c,0); c->fd = fd; c->querybuf = sdsempty(); c->reqtype = 0; c->argc = 0; c->argv = NULL; c->cmd = c->lastcmd = NULL; c->multibulklen = 0; c->bulklen = -1; c->sentlen = 0; c->flags = 0; c->lastinteraction = time(NULL); c->authenticated = 0; c->replstate = REDIS_REPL_NONE; c->slave_listening_port = 0; c->reply = listCreate(); c->reply_bytes = 0; listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); c->bpop.keys = NULL; c->bpop.count = 0; c->bpop.timeout = 0; c->bpop.target = NULL; c->io_keys = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); listSetFreeMethod(c->pubsub_patterns,decrRefCount); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); listAddNodeTail(server.clients,c); initClientMultiState(c); return c; }

下载本文
显示全文
专题