红联Linux门户
Linux协助

并发服务器(四):libuv

发布时刻:2018-03-02 09:30:34来历:linux.cn作者:qhwdw
这是并发网络服务器系列文章的第四部分。在这一部分中,咱们将运用 libuv 再次重写咱们的服务器,而且也会评论关于运用一个线程池在回调中去处理耗时使命。终究,咱们去看一下底层的 libuv,花一点时刻去学习怎样用异步 API 对文件体系堵塞操作进行封装。
 
本系列的一切文章:
第一节 - 简介(http://www.138comgov138.com/linux/32842.html)
第二节 - 线程(http://www.138comgov138.com/linux/32844.html)
第三节 - 事情驱动(http://www.138comgov138.com/linux/32997.html)
 
运用 libuv 笼统出事情驱动循环
在 第三节 中,咱们看到了根据 select 和 epoll 的服务器的相似之处,而且,我说过,在它们之间笼统出纤细的差别是件很有吸引力的事。许多库现已做到了这些,所以在这一部分中我将去选一个并运用它。我选的这个库是 libuv,它开端规划用于 Node.js 底层的可移植渠道层,而且,后来发现在其它的项目中也有运用。libuv 是用 C 写的,因而,它具有很高的可移植性,十分适用嵌入到像 JavaScript 和 Python 这样的高档言语中。
尽管 libuv 为了笼统出底层渠道细节现已变成了一个适当大的结构,但它依然是以 事情循环 思想为中心的。在咱们第三部分的事情驱动服务器中,事情循环是显式界说在 main 函数中的;当运用 libuv 时,该循环一般躲藏在库自身中,而用户代码仅需求注册事情句柄(作为一个回调函数)和运转这个循环。此外,libuv 会在给定的渠道上运用更快的事情循环完结,关于 Linux 它是 epoll,等等。
并发服务器(四):libuv
libuv 支撑多路事情循环,因而事情循环在库中是十分重要的;它有一个句柄 —— uv_loop_t,以及创立/杀死/发动/中止循环的函数。也便是说,在这篇文章中,我将仅需求运用 “默许的” 循环,libuv 可经过 uv_default_loop() 供给它;多路循环大多用于多线程事情驱动的服务器,这是一个更高档别的论题,我将留在这一系列文章的今后部分。
 
运用 libuv 的并发服务器
为了对 libuv 有一个更深的形象,让咱们跳转到咱们的牢靠协议的服务器,它经过咱们的这个系列现已有了一个强壮的从头完结。这个服务器的结构与第三部分中的根据 select 和 epoll 的服务器有一些相似之处,由于,它也依靠回调。完好的示例代码在这儿(https://github.com/eliben/code-for-blog/blob/master/2017/async-socket-server/uv-server.c);咱们开端设置这个服务器的套接字绑定到一个本地端口:
int portnum = 9090;
if (argc >= 2) {
portnum = atoi(argv[1]);
}
printf("Serving on port %d\n", portnum);
int rc;
uv_tcp_t server_stream;
if ((rc = uv_tcp_init(uv_default_loop(), &server_stream)) < 0) {
die("uv_tcp_init failed: %s", uv_strerror(rc));
}
struct sockaddr_in server_address;
if ((rc = uv_ip4_addr("0.0.0.0", portnum, &server_address)) < 0) {
die("uv_ip4_addr failed: %s", uv_strerror(rc));
}
if ((rc = uv_tcp_bind(&server_stream, (const struct sockaddr*)&server_address, 0)) < 0) {
die("uv_tcp_bind failed: %s", uv_strerror(rc));
}
除了它被封装进 libuv API 中之外,你看到的是一个适当规范的套接字。在它的回来中,咱们取得了一个可作业于任何 libuv 支撑的渠道上的可移植接口。
这些代码也展现了很认真负责的过错处理;大都的 libuv 函数回来一个整数状况,回来一个负数意味着呈现了一个过错。在咱们的服务器中,咱们把这些过错看做丧命问题进行处理,但也能够想象一个更高雅的过错康复。
现在,那个套接字现已绑定,是时分去监听它了。这儿咱们运转首个回调注册:
// Listen on the socket for new peers to connect. When a new peer connects,
// the on_peer_connected callback will be invoked.
if ((rc = uv_listen((uv_stream_t*)&server_stream, N_BACKLOG, on_peer_connected)) < 0) {
die("uv_listen failed: %s", uv_strerror(rc));
}
uv_listen 注册一个事情回调,当新的对端衔接到这个套接字时将会调用事情循环。咱们的回调在这儿被称为 on_peer_connected,咱们一瞬间将去查看它。
终究,main 运转这个 libuv 循环,直到它被中止(uv_run 仅在循环被中止或许发作过错时回来)。
// Run the libuv event loop.
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
// If uv_run returned, close the default loop before exiting.
return uv_loop_close(uv_default_loop());
留意,在运转事情循环之前,只要一个回调是经过 main 注册的;咱们稍后将看到怎样去增加更多的回调。在事情循环的整个运转过程中,增加和删去回调并不是一个问题 —— 事实上,大大都服务器便是这么写的。
这是一个 on_peer_connected,它处理到服务器的新的客户端衔接:
void on_peer_connected(uv_stream_t* server_stream, int status) {
if (status < 0) {
fprintf(stderr, "Peer connection error: %s\n", uv_strerror(status));
return;
}
// client will represent this peer; it's allocated on the heap and only
// released when the client disconnects. The client holds a pointer to
// peer_state_t in its data field; this peer state tracks the protocol state
// with this client throughout interaction.
uv_tcp_t* client = (uv_tcp_t*)xmalloc(sizeof(*client));
int rc;
if ((rc = uv_tcp_init(uv_default_loop(), client)) < 0) {
die("uv_tcp_init failed: %s", uv_strerror(rc));
}
client->data = NULL;
if (uv_accept(server_stream, (uv_stream_t*)client) == 0) {
struct sockaddr_storage peername;
int namelen = sizeof(peername);
if ((rc = uv_tcp_getpeername(client, (struct sockaddr*)&peername, &namelen)) < 0) {
die("uv_tcp_getpeername failed: %s", uv_strerror(rc));
}
report_peer_connected((const struct sockaddr_in*)&peername, namelen);
// Initialize the peer state for a new client: we start by sending the peer
// the initial '*' ack.
peer_state_t* peerstate = (peer_state_t*)xmalloc(sizeof(*peerstate));
peerstate->state = INITIAL_ACK;
peerstate->sendbuf[0] = '*';
peerstate->sendbuf_end = 1;
peerstate->client = client;
client->data = peerstate;
// Enqueue the write request to send the ack; when it's done,
// on_wrote_init_ack will be called. The peer state is passed to the write
// request via the data pointer; the write request does not own this peer
// state - it's owned by the client handle.
uv_buf_t writebuf = uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
uv_write_t* req = (uv_write_t*)xmalloc(sizeof(*req));
req->data = peerstate;
if ((rc = uv_write(req, (uv_stream_t*)client, &writebuf, 1,
on_wrote_init_ack)) < 0) {
die("uv_write failed: %s", uv_strerror(rc));
}
} else {
uv_close((uv_handle_t*)client, on_client_closed);
}
}
这些代码都有很好的注释,可是,这儿有一些重要的 libuv 语法我想去着重一下:
传入自界说数据到回调中:由于 C 言语还没有闭包,这或许是个应战,libuv 在它的一切的处理类型中有一个 void* data 字段;这些字段能够被用于传递用户数据。例如,留意 client->data 是怎样指向到一个 peer_state_t 结构上,以便于 uv_write 和 uv_read_start 注册的回调能够知道它们正在处理的是哪个客户端的数据。
内存办理:在带有废物收回的言语中进行事情驱动编程是十分简略的,由于,回调一般运转在一个与它们注册的当地彻底不同的栈帧中,使得根据栈的内存办理很困难。它总是需求传递堆分配的数据到 libuv 回调中(当一切回调运转时,除了 main,其它的都运转在栈上),而且,为了防止走漏,许多状况下都要求这些数据去安全开释(free())。这些都是些需求实践的内容 注1 。
这个服务器上对端的状况如下:
typedef struct {
ProcessingState state;
char sendbuf[SENDBUF_SIZE];
int sendbuf_end;
uv_tcp_t* client;
} peer_state_t;
它与第三部分中的状况十分相似;咱们不再需求 sendptr,由于,在调用 “done writing” 回调之前,uv_write 将保证发送它供给的整个缓冲。咱们也为其它的回调运用坚持了一个到客户端的指针。这儿是 on_wrote_init_ack:
void on_wrote_init_ack(uv_write_t* req, int status) {
if (status) {
die("Write error: %s\n", uv_strerror(status));
}
peer_state_t* peerstate = (peer_state_t*)req->data;
// Flip the peer state to WAIT_FOR_MSG, and start listening for incoming data
// from this peer.
peerstate->state = WAIT_FOR_MSG;
peerstate->sendbuf_end = 0;
int rc;
if ((rc = uv_read_start((uv_stream_t*)peerstate->client, on_alloc_buffer,
on_peer_read)) < 0) {
die("uv_read_start failed: %s", uv_strerror(rc));
}
// Note: the write request doesn't own the peer state, hence we only free the
// request itself, not the state.
free(req);
}
然后,咱们坚信知道了这个初始的 '*' 现已被发送到对端,咱们经过调用 uv_read_start 去监听从这个对端来的入站数据,它注册一个将被事情循环调用的回调(on_peer_read),不管什么时分,事情循环都在套接字上接纳来自客户端的调用:
void on_peer_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) {
if (nread < 0) {
if (nread != uv_eof) {
fprintf(stderr, "read error: %s\n", uv_strerror(nread));
}
uv_close((uv_handle_t*)client, on_client_closed);
} else if (nread == 0) {
// from the documentation of uv_read_cb: nread might be 0, which does not
// indicate an error or eof. this is equivalent to eagain or ewouldblock
// under read(2).
} else {
// nread > 0
assert(buf->len >= nread);
peer_state_t* peerstate = (peer_state_t*)client->data;
if (peerstate->state == initial_ack) {
// if the initial ack hasn't been sent for some reason, ignore whatever
// the client sends in.
free(buf->base);
return;
}
// run the protocol state machine.
for (int i = 0; i < nread; ++i) {
switch (peerstate->state) {
case initial_ack:
assert(0 && "can't reach here");
break;
case wait_for_msg:
if (buf->base[i] == '^') {
peerstate->state = in_msg;
}
break;
case in_msg:
if (buf->base[i] == '$') {
peerstate->state = wait_for_msg;
} else {
assert(peerstate->sendbuf_end < sendbuf_size);
peerstate->sendbuf[peerstate->sendbuf_end++] = buf->base[i] + 1;
}
break;
}
}
if (peerstate->sendbuf_end > 0) {
// we have data to send. the write buffer will point to the buffer stored
// in the peer state for this client.
uv_buf_t writebuf =
uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
uv_write_t* writereq = (uv_write_t*)xmalloc(sizeof(*writereq));
writereq->data = peerstate;
int rc;
if ((rc = uv_write(writereq, (uv_stream_t*)client, &writebuf, 1,
on_wrote_buf)) < 0) {
die("uv_write failed: %s", uv_strerror(rc));
}
}
}
free(buf->base);
}
这个服务器的运转时行为十分相似于第三部分的事情驱动服务器:一切的客户端都在一个单个的线程中并发处理。而且相似的,一些特定的行为必须在服务器代码中保护:服务器的逻辑完结为一个集成的回调,而且长周期运转是制止的,由于它会堵塞事情循环。这一点也很相似。让咱们进一步探究这个问题。
 
在事情驱动循环中的长周期运转的操作
单线程的事情驱动代码使它先天就简略遭到一些常见问题的影响:长周期运转的代码会堵塞整个循环。拜见如下的程序:
void on_timer(uv_timer_t* timer) {
uint64_t timestamp = uv_hrtime();
printf("on_timer [%" PRIu64 " ms]\n", (timestamp / 1000000) % 100000);
// "Work"
if (random() % 5 == 0) {
printf("Sleeping...\n");
sleep(3);
}
}
int main(int argc, const char** argv) {
uv_timer_t timer;
uv_timer_init(uv_default_loop(), &timer);
uv_timer_start(&timer, on_timer, 0, 1000);
return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
}
它用一个单个注册的回调运转一个 libuv 事情循环:on_timer,它被每秒钟循环调用一次。回调陈述一个时刻戳,而且,偶然经过睡觉 3 秒去模仿一个长周期运转。这是运转示例:
$ ./uv-timer-sleep-demo
on_timer [4840 ms]
on_timer [5842 ms]
on_timer [6843 ms]
on_timer [7844 ms]
Sleeping...
on_timer [11845 ms]
on_timer [12846 ms]
Sleeping...
on_timer [16847 ms]
on_timer [17849 ms]
on_timer [18850 ms]
...
on_timer 忠实地每秒履行一次,直到随机呈现的睡觉中止。在那个时刻点,on_timer 不再被调用,直到睡觉时刻完毕;事实上,没有其它的回调  会在这个时刻帧中被调用。这个睡觉调用堵塞了当时线程,它正是被调用的线程,而且也是事情循环运用的线程。当这个线程被堵塞后,事情循环也被堵塞。
这个示例演示了在事情驱动的调用中为什么回调不能被堵塞是多少的重要。而且,相同适用于 Node.js 服务器、客户端侧的 Javascript、大大都的 GUI 编程结构、以及许多其它的异步编程模型。
可是,有时分运转耗时的使命是不可防止的。并不是一切使命都有一个异步 API;例如,咱们或许运用一些仅有同步 API 的库去处理,或许,正在履行一个或许的长周期核算。咱们怎样用事情驱动编程去结合这些代码?线程能够帮到你!
 
“转化” 堵塞调用为异步调用的线程
一个线程池能够用于转化堵塞调用为异步调用,经过与事情循环并行运转,而且当使命完结时去由它去发布事情。以堵塞函数 do_work() 为例,这儿介绍了它是怎样运转的:
1.不在一个回调中直接调用 do_work() ,而是将它打包进一个 “使命”,让线程池去运转这个使命。当使命完结时,咱们也为循环去调用它注册一个回调;咱们称它为 on_work_done()。
2.在这个时刻点,咱们的回调就能够回来了,而事情循环坚持运转;在同一时刻点,线程池中的有一个线程运转这个使命。
3.一旦使命运转完结,告诉主线程(纠正在运转事情循环的线程),而且事情循环调用 on_work_done()。
让咱们看一下,运用 libuv 的作业调度 API,是怎样去处理咱们前面的计时器/睡觉示例中展现的问题的:
void on_after_work(uv_work_t* req, int status) {
free(req);
}
void on_work(uv_work_t* req) {
// "Work"
if (random() % 5 == 0) {
printf("Sleeping...\n");
sleep(3);
}
}
void on_timer(uv_timer_t* timer) {
uint64_t timestamp = uv_hrtime();
printf("on_timer [%" PRIu64 " ms]\n", (timestamp / 1000000) % 100000);
uv_work_t* work_req = (uv_work_t*)malloc(sizeof(*work_req));
uv_queue_work(uv_default_loop(), work_req, on_work, on_after_work);
}
int main(int argc, const char** argv) {
uv_timer_t timer;
uv_timer_init(uv_default_loop(), &timer);
uv_timer_start(&timer, on_timer, 0, 1000);
return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
}
经过一个 work_req 注2 类型的句柄,咱们进入一个使命行列,替代在 on_timer 上直接调用 sleep,这个函数在使命中(on_work)运转,而且,一旦使命完结(on_after_work),这个函数被调用一次。on_work 是指 “work”(堵塞中的/耗时的操作)进行的当地。留意在这两个回调传递到 uv_queue_work 时的一个要害差异:on_work 运转在线程池中,而 on_after_work 运转在事情循环中的主线程上 —— 就好像是其它的回调相同。
让咱们看一下这种办法的运转:
$ ./uv-timer-work-demo
on_timer [89571 ms]
on_timer [90572 ms]
on_timer [91573 ms]
on_timer [92575 ms]
Sleeping...
on_timer [93576 ms]
on_timer [94577 ms]
Sleeping...
on_timer [95577 ms]
on_timer [96578 ms]
on_timer [97578 ms]
...
即便在 sleep 函数被调用时,定时器也每秒钟滴答一下,睡觉现在运转在一个独自的线程中,而且不会堵塞事情循环。
 
一个用于操练的素数测验服务器
由于经过睡觉去模仿作业并不是件让人振奋的事,我有一个事前准备好的更归纳的一个示例 —— 一个根据套接字承受来自客户端的数字的服务器,查看这个数字是否是素数,然后去回来一个 “prime" 或许 “composite”。完好的服务器代码在这儿(https://github.com/eliben/code-for-blog/blob/master/2017/async-socket-server/uv-isprime-server.c) —— 我不在这儿粘贴了,由于它太长了,更期望读者在一些自己的操练中去领会它。
这个服务器运用了一个原生的素数测验算法,因而,关于大的素数或许花很长时刻才回来一个答复。在我的机器中,关于 2305843009213693951,它花了 ~5 秒钟去核算,可是,你的办法或许不同。
操练 1:服务器有一个设置(经过一个名为 MODE 的环境变量)要么在套接字回调(意味着在主线程上)中运转素数测验,要么在 libuv 作业行列中。当多个客户端一起衔接时,运用这个设置来调查服务器的行为。当它核算一个大的使命时,在堵塞形式中,服务器将不回复其它客户端,而在非堵塞形式中,它会回复。
操练 2:libuv 有一个缺省巨细的线程池,而且线程池的巨细能够经过环境变量装备。你能够经过运用多个客户端去试验找出它的缺省值是多少?找到线程池缺省值后,运用不同的设置去看一下,在重负载下怎样去影响服务器的呼应才能。
 
在非堵塞文件体系中运用作业行列
关于仅仅板滞的演示和 CPU 密集型的核算来说,将或许的堵塞操作托付给一个线程池并不是正确的;libuv 在它的文件体系 API 中自身就许多运用了这种才能。经过这种办法,libuv 运用一个异步 API,以一个简便的办法显现出它强壮的文件体系的处理才能。
让咱们运用 uv_fs_read(),例如,这个函数从一个文件中(表明为一个 uv_fs_t 句柄)读取一个文件到一个缓冲中 注3,而且当读取完结后调用一个回调。换句话说,uv_fs_read() 总是当即回来,即使是文件在一个相似 NFS 的体系上,而数据抵达缓冲区或许需求一些时刻。换句话说,这个 API 与这种办法中其它的 libuv API 是异步的。这是怎样作业的呢?
在这一点上,咱们看一下 libuv 的底层;内部实践上十分简略,而且它是一个很好的操练。作为一个可移植的库,libuv 关于 Windows 和 Unix 体系在它的许多函数上有不同的完结。咱们去看一下在 libuv 源树中的 src/unix/fs.c。
这是 uv_fs_read 的代码:
int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
uv_file file,
const uv_buf_t bufs[],
unsigned int nbufs,
int64_t off,
uv_fs_cb cb) {
if (bufs == NULL || nbufs == 0)
return -EINVAL;
INIT(READ);
req->file = file;
req->nbufs = nbufs;
req->bufs = req->bufsml;
if (nbufs > ARRAY_SIZE(req->bufsml))
req->bufs = uv__malloc(nbufs * sizeof(*bufs));
if (req->bufs == NULL) {
if (cb != NULL)
uv__req_unregister(loop, req);
return -ENOMEM;
}
memcpy(req->bufs, bufs, nbufs * sizeof(*bufs));
req->off = off;
POST;
}
第一次看或许觉得很困难,由于它推迟实在的作业到 INIT 和 POST 宏中,以及为 POST 设置了一些本地变量。这样做能够防止了文件中的许多重复代码。
这是 INIT 宏:
并发服务器(四):libuv
它设置了恳求,而且更重要的是,设置 req->fs_type 域为实在的 FS 恳求类型。由于 uv_fs_read 调用 INIT(READ),它意味着 req->fs_type 被分配一个常数 UV_FS_READ。
这是 POST 宏:
并发服务器(四):libuv
它做什么取决于回调是否为 NULL。在 libuv 文件体系 API 中,一个 NULL 回调意味着咱们实在地期望去履行一个 同步 操作。在这种状况下,POST 直接调用 uv__fs_work(咱们需求了解一下这个函数的功用),而关于一个非 NULL 回调,它把 uv__fs_work 作为一个作业项提交到作业行列(指的是线程池),然后,注册 uv__fs_done 作为回调;该函数履行一些挂号并调用用户供给的回调。
假如咱们去看 uv__fs_work 的代码,咱们将看到它运用许多宏依照需求将作业分发到实践的文件体系调用。在咱们的事例中,关于 UV_FS_READ 这个调用将被 uv__fs_read 生成,它(终究)运用一般的 POSIX API 去读取。这个函数能够在一个 堵塞 办法中很安全地完结。由于,它经过异步 API 调用时被置于一个线程池中。
在 Node.js 中,fs.readFile 函数是映射到 uv_fs_read 上。因而,能够在一个非堵塞形式中读取文件,乃至是当底层文件体系 API 是堵塞办法时。
 
注1:为保证服务器不走漏内存,我在一个启用走漏查看的 Valgrind 中运转它。由于服务器经常是被规划为永久运转,这是一个应战;为战胜这个问题,我在服务器上增加了一个 “kill 开关” —— 一个从客户端接纳的特定序列,以使它能够中止事情循环并退出。这个代码在 theon_wrote_buf 句柄中。
注2:在这儿咱们不过多地运用 work_req;评论的素数测验服务器接下来将展现怎样被用于去传递上下文信息到回调中。
注3:uv_fs_read() 供给了一个相似于 preadv Linux 体系调用的通用 API:它运用多缓冲区用于排序,而且支撑一个到文件中的偏移。根据咱们评论的意图能够疏忽这些特性。
 
Linux下高并发socket,单机供给五十万衔接:http://www.138comgov138.com/linux/28982.html
修正mysql的最大并发数:http://www.138comgov138.com/linux/21697.html
Linux下高并发socket最大衔接数所受的各种约束:http://www.138comgov138.com/linux/19198.html
Nginx高并发装备思路(轻松应对1万并发量):http://www.138comgov138.com/linux/17935.html
GNU Linux高并发功能优化计划:http://www.138comgov138.com/linux/12911.html