视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
node.js的http.createServer过程深入解析
2020-11-27 21:55:25 责编:小采
文档


下面是nodejs创建一个服务器的代码。接下来我们一起分析这个过程。

var http = require('http');
http.createServer(function (request, response) {
 response.end('Hello World
');
}).listen(9297);

首先我们去到lib/http.js模块看一下这个函数的代码。

function createServer(requestListener) {
 return new Server(requestListener);
}

只是对_http_server.js做了些封装。我们继续往下看。

function Server(requestListener) {
 if (!(this instanceof Server)) return new Server(requestListener);
 net.Server.call(this, { allowHalfOpen: true });
 // 收到http请求时执行的回调
 if (requestListener) {
 this.on('request', requestListener);
 }

 this.httpAllowHalfOpen = false;
 // 建立tcp连接的回调
 this.on('connection', connectionListener);

 this.timeout = 2 * 60 * 1000;
 this.keepAliveTimeout = 5000;
 this._pendingResponseData = 0;
 this.maxHeadersCount = null;
}
util.inherits(Server, net.Server);

发现_http_server.js也没有太多逻辑,继续看lib/net.js下的代码。

function Server(options, connectionListener) {
 if (!(this instanceof Server))
 return new Server(options, connectionListener);

 EventEmitter.call(this);
 // connectionListener在http.js处理过了
 if (typeof options === 'function') {
 connectionListener = options;
 options = {};
 this.on('connection', connectionListener);
 } else if (options == null || typeof options === 'object') {
 options = options || {};

 if (typeof connectionListener === 'function') {
 this.on('connection', connectionListener);
 }
 } else {
 throw new errors.TypeError('ERR_INVALID_ARG_TYPE',
 'options',
 'Object',
 options);
 }

 this._connections = 0;
 ......
 this[async_id_symbol] = -1;
 this._handle = null;
 this._usingWorkers = false;
 this._workers = [];
 this._unref = false;

 this.allowHalfOpen = options.allowHalfOpen || false;
 this.pauseOnConnect = !!options.pauseOnConnect;
}

至此http.createServer就执行结束了,我们发现这个过程还没有涉及到很多逻辑,并且还是停留到js层面。接下来我们继续分析listen函数的过程。该函数是net模块提供的。我们只看关键的代码。

Server.prototype.listen = function(...args) {
 // 处理入参,根据文档我们知道listen可以接收好几个参数,我们这里是只传了端口号9297
 var normalized = normalizeArgs(args);
 // normalized = [{port: 9297}, null];
 var options = normalized[0];
 var cb = normalized[1];
 // 第一次listen的时候会创建,如果非空说明已经listen过
 if (this._handle) {
 throw new errors.Error('ERR_SERVER_ALREADY_LISTEN');
 }
 ......
 listenInCluster(this, null, options.port | 0, 4,
 backlog, undefined, options.exclusive);
}
function listenInCluster() {
 ...
 server._listen2(address, port, addressType, backlog, fd);
}

_listen2 = setupListenHandle = function() {
 ......
 this._handle = createServerHandle(...);
 this._handle.listen(backlog || 511);
}
function createServerHandle() {
 handle = new TCP(TCPConstants.SERVER);
 handle.bind(address, port);
}

到这我们终于看到了tcp连接的内容,每一个服务器新建一个handle并且保存他,该handle是一个TCP对象。然后执行bind和listen函数。接下来我们就看一下TCP类的代码。TCP是C++提供的类。对应的文件是tcp_wrap.cc。我们看看new TCP的时候发生了什么。

void TCPWrap::New(const FunctionCallbackInfo<Value>& args) {
 // This constructor should not be exposed to public javascript.
 // Therefore we assert that we are not trying to call this as a
 // normal function.
 CHECK(args.IsConstructCall());
 CHECK(args[0]->IsInt32());
 Environment* env = Environment::GetCurrent(args);

 int type_value = args[0].As<Int32>()->Value();
 TCPWrap::SocketType type = static_cast<TCPWrap::SocketType>(type_value);

 ProviderType provider;
 switch (type) {
 case SOCKET:
 provider = PROVIDER_TCPWRAP;
 break;
 case SERVER:
 provider = PROVIDER_TCPSERVERWRAP;
 break;
 default:
 UNREACHABLE();
 }

 new TCPWrap(env, args.This(), provider);
}


TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
 : ConnectionWrap(env, object, provider) {
 int r = uv_tcp_init(env->event_loop(), &handle_);
 CHECK_EQ(r, 0); 
}

我们看到,new TCP的时候其实是执行libuv的uv_tcp_init函数,初始化一个uv_tcp_t的结构体。首先我们先看一下uv_tcp_t结构体的结构。

uv_tcp_t
uv_tcp_t
// 初始化一个tcp流的结构体
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
 // 未指定未指定协议
 return uv_tcp_init_ex(loop, tcp, AF_UNSPEC);
}

int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) {
 int domain;

 /* Use the lower 8 bits for the domain */
 // 低八位是domain
 domain = flags & 0xFF;
 if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
 return UV_EINVAL;
 // 除了第八位的其他位是flags
 if (flags & ~0xFF)
 return UV_EINVAL;

 uv__stream_init(loop, (uv_stream_t*)tcp, UV_TCP);

 /* If anything fails beyond this point we need to remove the handle from
 * the handle queue, since it was added by uv__handle_init in uv_stream_init.
 */

 if (domain != AF_UNSPEC) {
 int err = maybe_new_socket(tcp, domain, 0);
 if (err) {
 // 出错则把该handle移除loop队列
 QUEUE_REMOVE(&tcp->handle_queue);
 return err;
 }
 }

 return 0;
}

我们接着看uv__stream_init做了什么事情。

void uv__stream_init(uv_loop_t* loop,
 uv_stream_t* stream,
 uv_handle_type type) {
 int err;

 uv__handle_init(loop, (uv_handle_t*)stream, type);
 stream->read_cb = NULL;
 stream->alloc_cb = NULL;
 stream->close_cb = NULL;
 stream->connection_cb = NULL;
 stream->connect_req = NULL;
 stream->shutdown_req = NULL;
 stream->accepted_fd = -1;
 stream->queued_fds = NULL;
 stream->delayed_error = 0;
 QUEUE_INIT(&stream->write_queue);
 QUEUE_INIT(&stream->write_completed_queue);
 stream->write_queue_size = 0;

 if (loop->emfile_fd == -1) {
 err = uv__open_cloexec("/dev/null", O_RDONLY);
 if (err < 0)
 /* In the rare case that "/dev/null" isn't mounted open "/"
 * instead.
 */
 err = uv__open_cloexec("/", O_RDONLY);
 if (err >= 0)
 loop->emfile_fd = err;
 }

#if defined(__APPLE__)
 stream->select = NULL;
#endif /* defined(__APPLE_) */
 // 初始化io观察者
 uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}

void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
 assert(cb != NULL);
 assert(fd >= -1);
 // 初始化队列,回调,需要监听的fd
 QUEUE_INIT(&w->pending_queue);
 QUEUE_INIT(&w->watcher_queue);
 w->cb = cb;
 w->fd = fd;
 w->events = 0;
 w->pevents = 0;

#if defined(UV_HAVE_KQUEUE)
 w->rcount = 0;
 w->wcount = 0;
#endif /* defined(UV_HAVE_KQUEUE) */
}

从代码可以知道,只是对uv_tcp_t结构体做了一些初始化操作。到这,new TCP的逻辑就执行完毕了。接下来就是继续分类nodejs里调用bind和listen的逻辑。nodejs的bind对应libuv的函数是uv__tcp_bind,listen对应的是uv_tcp_listen。
先看一个bind的核心代码。

/* Cannot set IPv6-only mode on non-IPv6 socket. */
 if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
 return UV_EINVAL;
 // 获取一个socket并且设置某些标记
 err = maybe_new_socket(tcp, addr->sa_family, 0);
 if (err)
 return err;

 on = 1;
 // 设置在端口可重用
 if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
 return UV__ERR(errno);
 bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE
static int maybe_new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
 struct sockaddr_storage saddr;
 socklen_t slen;

 if (domain == AF_UNSPEC) {
 handle->flags |= flags;
 return 0;
 }
 return new_socket(handle, domain, flags);
}
static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
 struct sockaddr_storage saddr;
 socklen_t slen;
 int sockfd;
 int err;
 // 获取一个socket
 err = uv__socket(domain, SOCK_STREAM, 0);
 if (err < 0)
 return err;
 sockfd = err;
 // 设置选项和保存socket的文件描述符到io观察者中
 err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);
 if (err) {
 uv__close(sockfd);
 return err;
 }
 ...
 return 0;
}

int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
 return UV_EBUSY;

 assert(fd >= 0);
 stream->flags |= flags;

 if (stream->type == UV_TCP) {
 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
 return UV__ERR(errno);

 /* TODO Use delay the user passed in. */
 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
 uv__tcp_keepalive(fd, 1, 60)) {
 return UV__ERR(errno);
 }
 }
 ...
 // 保存socket对应的文件描述符到io观察者中,libuv会在io poll阶段监听该文件描述符
 stream->io_watcher.fd = fd;

 return 0;
}

上面的一系列操作主要是新建一个socket文件描述符,设置一些flag,然后把文件描述符保存到IO观察者中,libuv在poll IO阶段会监听该文件描述符,如果有事件到来,会执行设置的回调函数,该函数是在uv__stream_init里设置的uv__stream_io。最后执行bind函数进行绑定操作。最后我们来分析一下listen函数。首先看下tcp_wrapper.cc的代码。

void TCPWrap::Listen(const FunctionCallbackInfo<Value>& args) {
 TCPWrap* wrap;
 ASSIGN_OR_RETURN_UNWRAP(&wrap,
 args.Holder(),
 args.GetReturnValue().Set(UV_EBADF));
 int backlog = args[0]->Int32Value();
 int err = uv_listen(reinterpret_cast<uv_stream_t*>(&wrap->handle_),
 backlog,
 OnConnection);
 args.GetReturnValue().Set(err);
}

代码中有个很重要的地方就是OnConnection函数,nodejs给listen函数设置了一个回调函数OnConnection,该函数在IO观察者里保存的文件描述符有连接到来时会被调用。OnConnection函数是在connection_wrap.cc定义的,tcp_wrapper继承了connection_wrap。下面我们先看一下uv_listen。该函数调用了uv_tcp_listen。该函数的核心代码如下。

 if (listen(tcp->io_watcher.fd, backlog))
 return UV__ERR(errno);
 // cb即OnConnection
 tcp->connection_cb = cb;
 tcp->flags |= UV_HANDLE_BOUND;

 // 有连接到来时的libuv层回调,覆盖了uv_stream_init时设置的值
 tcp->io_watcher.cb = uv__server_io;
 // 注册事件
 uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);

在libuv的poll IO阶段,epoll_wait会监听到到来的连接,然后调用uv__server_io。下面是该函数的核心代码。

// 继续注册事件,等待连接
 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
 err = uv__accept(uv__stream_fd(stream));
 // 保存连接对应的socket
 stream->accepted_fd = err;
 // 执行nodejs层回调
 stream->connection_cb(stream, 0);

libuv会摘下一个连接,得到对应的socket。然后执行nodejs层的回调,这时候我们来看一下OnConnection的代码。

OnConnection(uv_stream_t* handle,int status)
 if (status == 0) {
 // 新建一个uv_tcp_t结构体
 Local<Object> client_obj = WrapType::Instantiate(env, wrap_data, WrapType::SOCKET);
 WrapType* wrap;
 ASSIGN_OR_RETURN_UNWRAP(&wrap, client_obj);
 uv_stream_t* client_handle = reinterpret_cast<uv_stream_t*>(&wrap->handle_);
 // uv_accept返回0表示成功
 if (uv_accept(handle, client_handle))
 return;
 argv[1] = client_obj;
 }
 // 执行上层的回调,该回调是net.js设置的onconnection
 wrap_data->MakeCallback(env->onconnection_string(), arraysize(argv), argv);

OnConnection新建了一个uv_tcp_t结构体。代表这个连接。然后调用uv_accept。

int uv_accept(uv_stream_t* server, uv_stream_t* client) {
 ...
 // 新建的uv_tcp_t结构体关联accept_fd,注册读写事件
 uv__stream_open(client, server->accepted_fd, UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
 ...
}

最后执行nodejs的回调。

function onconnection(err, clientHandle) {
 var handle = this;
 var self = handle.owner;
 if (err) {
 self.emit('error', errnoException(err, 'accept'));
 return;
 }
 if (self.maxConnections && self._connections >= self.maxConnections) {
 clientHandle.close();
 return;
 }
 var socket = new Socket({
 handle: clientHandle,
 allowHalfOpen: self.allowHalfOpen,
 pauseOnCreate: self.pauseOnConnect
 });
 socket.readable = socket.writable = true;
 self._connections++;
 socket.server = self;
 socket._server = self;
 DTRACE_NET_SERVER_CONNECTION(socket);
 LTTNG_NET_SERVER_CONNECTION(socket);
 COUNTER_NET_SERVER_CONNECTION(socket);
 // 触发_http_server.js里设置的connectionListener回调
 self.emit('connection', socket);
}

listen函数总体的逻辑就是把socket设置为可监听,然后注册事件,等待连接的到来,连接到来的时候,调用accept获取新建立的连接,tcp_wrapper.cc的回调新建一个uv_tcp_t结构体,代表新的连接,然后设置可读写事件,并且设置回调为uv__stream_io,等待数据的到来。最后执行_http_server.js设置的回调connectionListener。至此,服务器启动并且接收连接的过程就完成了。接下来就是对用户数据的读写。当用户传来数据时,处理数据的函数是uv__stream_io。后面继续解析数据的读写。

总结

下载本文
显示全文
专题