博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Nodejs事件引擎libuv源码剖析之:高效线程池(threadpool)的实现
阅读量:6037 次
发布时间:2019-06-20

本文共 5245 字,大约阅读时间需要 17 分钟。

hot3.png

前言

      Nodejs编程是全异步的,这就意味着我们不必每次都阻塞等待该次操作的结果,而事件完成(就绪)时会主动回调通知我们。在网络编程中,一般都是基于Reactor线程模型的变种,无论其怎么演化,其核心组件都包含了Reactor实例(提供事件注册、注销、通知功能)、多路复用器(由操作系统提供,比如kqueue、select、epoll等)、事件处理器(负责事件的处理)以及事件源(linux中这就是描述符)这四个组件。一般,会单独启动一个线程运行Reactor实例来实现真正的异步操作。但是,依赖操作系统提供的系统调用来实现异步是有局限的,比如在Reactor模型中我们只能监听到:网络IO事件、signel(信号)、超时事件以及一些管道事件等,但这些事件也只是通知我们资源可读或者可写,真正的读写操作(read和write)还是同步的(也就是你必须等到read或者write返回,虽然linux提供了aio,但是其有诸多槽点),那么Nodejs的全异步是如何做到的呢?你可能会很快想到,就是启用单独的线程来做同步的事情,这也是libuv的设计思路,借用官网的一张图,说明一切:

                142652_bpr5_2896894.png

      由上图可以看到,libuv实现了一套自己的线程池来处理所有同步操作(从而模拟出异步的效果),下面就来看一下该线程池的具体实现吧!

线程池模型

      说到线程池,在java领域中,jdk本身就提供了多种线程池实现,几乎所有的线程池都遵循以下模型(任务队列+线程池):

                142731_H9kA_2896894.png

      libuv自身定义了一个非常精炼、高效的队列(双向循环链表),只用了几个简单的宏定义将其实现,具体实现方式可以参见我的另一篇博文:。现在队列有了,来看一下task的定义:

struct uv__work {   void (*work)(struct uv__work *w);   void (*done)(struct uv__work *w, int status);   struct uv_loop_s* loop;   void* wq[2]; };

      uv__work就代表一个task,可以看到里面有两个函数指针(work代表任务实际操作,done用于对任务进行状态确认)。wq成员就是一个QUEUE的节点,  uv__work就是通过wq与其他  uv__work连接成一个队列。

     下面来看一下threadpool的初始化,代码如下:

#define MAX_THREADPOOL_SIZE 128static uv_once_t once = UV_ONCE_INIT;static uv_cond_t cond;static uv_mutex_t mutex;static unsigned int idle_threads;//当前空闲的线程数static unsigned int nthreads;static uv_thread_t* threads;static uv_thread_t default_threads[4];static QUEUE exit_message;static QUEUE wq;//线程池全部会检查这个queue,一旦发现有任务就执行,但是只能有一个线程抢占到static volatile int initialized;static void init_once(void) {  unsigned int i;  const char* val;  // 线程池中的线程数,默认值为4  nthreads = ARRAY_SIZE(default_threads);  val = getenv("UV_THREADPOOL_SIZE");  if (val != NULL)    nthreads = atoi(val);  if (nthreads == 0)    nthreads = 1;  if (nthreads > MAX_THREADPOOL_SIZE)    nthreads = MAX_THREADPOOL_SIZE;  threads = default_threads;  if (nthreads > ARRAY_SIZE(default_threads)) {    // 分配线程句柄    threads = uv__malloc(nthreads * sizeof(threads[0]));    if (threads == NULL) {      nthreads = ARRAY_SIZE(default_threads);      threads = default_threads;    }  }  // 初始化条件变量  if (uv_cond_init(&cond))    abort();  // 初始化互斥锁  if (uv_mutex_init(&mutex))    abort();  // 初始化任务队列  QUEUE_INIT(&wq);  // 创建nthreads个线程  for (i = 0; i < nthreads; i++)    if (uv_thread_create(threads + i, worker, NULL))      abort();  initialized = 1;}

     上面的代码中,一共创建了nthreads个线程,那么每个线程的执行代码是什么呢?由线程创建代码:uv_thread_create(threads + i, worker, NULL),可以看到,每一个线程都是执行worker函数,下面看看worker函数都在做什么:

/* To avoid deadlock with uv_cancel() it's crucial that the worker * never holds the global mutex and the loop-local mutex at the same time. */static void worker(void* arg) {  struct uv__work* w;  QUEUE* q;  (void) arg;  for (;;) {    // 因为是多线程访问,因此需要加锁同步    uv_mutex_lock(&mutex);    // 如果任务队列是空的    while (QUEUE_EMPTY(&wq)) {      // 空闲线程数加1      idle_threads += 1;      // 等待条件变量      uv_cond_wait(&cond, &mutex);      // 被唤醒之后,说明有任务被post到队列,因此空闲线程数需要减1      idle_threads -= 1;    }        // 取出队列的头部节点(第一个task)    q = QUEUE_HEAD(&wq);    if (q == &exit_message)      uv_cond_signal(&cond);    else {      // 从队列中移除这个task      QUEUE_REMOVE(q);      QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is                             executing. */    }    uv_mutex_unlock(&mutex);    if (q == &exit_message)      break;    // 取出uv__work首地址    w = QUEUE_DATA(q, struct uv__work, wq);    // 调用task的work,执行任务    w->work(w);    uv_mutex_lock(&w->loop->wq_mutex);    w->work = NULL;  /* Signal uv_cancel() that the work req is done                        executing. */    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);    uv_async_send(&w->loop->wq_async);    uv_mutex_unlock(&w->loop->wq_mutex);  }}

      可以看到,多个线程都会在worker方法中等待在conn条件变量上,一旦有任务加入队列,线程就会被唤醒,然后只有一个线程会得到任务的执行权,其他的线程只能继续等待。

      那么如何向队列提交一个task呢?看以下代码:

void uv__work_submit(uv_loop_t* loop,                     struct uv__work* w,                     void (*work)(struct uv__work* w),                     void (*done)(struct uv__work* w, int status)) {  uv_once(&once, init_once);  // 构造一个task  w->loop = loop;  w->work = work;  w->done = done;  // 将其插入任务队列  post(&w->wq);}

      接着看post做了什么:

static void post(QUEUE* q) {  // 同步队列操作  uv_mutex_lock(&mutex);  // 将task插入队列尾部  QUEUE_INSERT_TAIL(&wq, q);  // 如果当前有空闲线程,就向条件变量发送信号  if (idle_threads > 0)    uv_cond_signal(&cond);  uv_mutex_unlock(&mutex);}

     有提交任务,就肯定会有取消一个任务的操作,是的,他就是uv__work_cancel,代码如下:

static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {  int cancelled;  uv_mutex_lock(&mutex);  uv_mutex_lock(&w->loop->wq_mutex);    // 只有当前队列不为空并且要取消的uv__work有效时才会继续执行  cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;  if (cancelled)    QUEUE_REMOVE(&w->wq);// 从队列中移除task  uv_mutex_unlock(&w->loop->wq_mutex);  uv_mutex_unlock(&mutex);  if (!cancelled)    return UV_EBUSY;  // 更新这个task的状态  w->work = uv__cancelled;  uv_mutex_lock(&loop->wq_mutex);  QUEUE_INSERT_TAIL(&loop->wq, &w->wq);  uv_async_send(&loop->wq_async);  uv_mutex_unlock(&loop->wq_mutex);  return 0;}

      至此,一个线程池的组成以及实现原理都说完了,可以看到,libuv几乎是用了最少的代码完成了高效的线程池,这对于我们平时写代码时具有很好的借鉴意义,文中涉及到uv_req_t以及uv_loop_t等结构我都直接跳过,因为这牵扯到libuv的其他组件,我将在以后的源码剖析中逐步阐述,谢谢你能看到这里。

系列文章

 

Nodejs事件引擎libuv源码剖析之:事件循环(loop)结构的设计剖析

Nodejs事件引擎libuv源码剖析之:跨平台系统调用(syscall)实现原理

Nodejs事件引擎libuv源码剖析之:文件(fs)实现原理

Nodejs事件引擎libuv源码剖析之:管道(pipe)实现原理

Nodejs事件引擎libuv源码剖析之:网络(net)实现原理

Nodejs事件引擎libuv源码剖析之:定时器(timer)实现原理

转载于:https://my.oschina.net/fileoptions/blog/1036609

你可能感兴趣的文章
mysql通过配置文件进行优化
查看>>
省级网站群建设关注点
查看>>
工作第四天之采集资源
查看>>
留与后人一段面试的总结
查看>>
Spring基于XML方式配置事务
查看>>
T-MBA学习营 | 寒窗十数载,我们原来并不会学习?
查看>>
log4j.properties模板
查看>>
Linux:信号(上)
查看>>
vmware虚拟化无法迁移虚拟机
查看>>
SQL UPDATE实现多表更新
查看>>
最近有个需求,就是把某个网址跳转到另外一个网址
查看>>
innobackupex 在增量的基础上增量备份
查看>>
Windows Server 2012 R2 DirectAccess功能测试(2)App1服务器安装及配置
查看>>
基于清单的启动器的实现
查看>>
外网用户通过citrix打印慢的解决方法
查看>>
STL容器的使用
查看>>
关于std::map
查看>>
JXL导出Excel文件兼容性问题
查看>>
VBoot1.0发布,Vue & SpringBoot 综合开发入门
查看>>
centos7 安装wps 后 演示无法启动
查看>>