Netty

Netty 3源码分析(四)Boss & Worker

Boss和Worker是Netty 3中的工作线程,也是进行IO事件处理最核心的部分。

Boss & Worker

从类图中我们可以看到,Boss和Worker都继承自AbstractNioSelector,下面也将从这个公共的基类说起。

AbstractNioSelector

  1. 维护一个jdk中Selector的引用
  2. 自身实现了Runnable接口
  3. 包含Executor和当前Thread的引用
  4. 包含一个元素类型为Runnable的taskQueue

其运行过程就是向taskQueue中添加任务,再消费任务的过程

processTaskQueue()是在run()方法中被调用的,当AbstractNioSelector作为一个线程开始运行时,即开启了队列的消费过程。下面我们再看一下run()方法还做了哪些事情

  1. 当select事件为空的时候,会进行一系列IO状态的检查,包括对jdk epoll() bug的处理
  2. 之后执行一次processTaskQueue(),消费队列中的任务
  3. shutdown情况下额外执行一次processTaskQueue()
  4. 最后执行process(selector),从selector获取IO事件并添加到taskQueue。该方法为抽象方法,由不同的子类去实现。

这里我们也可以看出一个问题:对于每一个Boss / Worker,它们是单线程去执行 消费队列中的任务 -> 从selector获取任务添加到队列 的过程,如果某一次select添加的任务过多,且处理的比较缓慢(例如该线程没有获得足够的运行时间),后续的select事件就要被延迟处理。应用中的一些请求超时现象可能与此有关。

NioServerBoss

上文中提到ServerBoss在启动的RegisterTask中会创建一个ServerSocketChannel、执行bind()操作,同时也会向selector注册OP_ACCEPT事件。这样SeverBoss在select时就会捕获accpet类型的事件

通过非阻塞的accept()方法将所有ready的连接全部接受,然后将得到的channel进行注册

  1. 获取sink、构建新的pipeline
  2. 从workerPool中选取一个worker
  3. 创建NioAcceptedSocketChannel并向worker注册

创建NioAcceptedSocketChannel会发出一个Upstream的OPEN事件,没有太多复杂的逻辑,后面会重点介绍一下向worker进行注册的过程。Boss的主要职责就是接受新建的连接,并把它分配给某一worker线程处理。

NioClientBoss

ClientBoss的功能比较特殊,它是在NioClientSocketPipelineSink中channel.connect()返回false的时候,去启动它的RegisterTask。这种情况下channel是non-blocking模式(实践中发现这是一个普遍到的现象),且调用connect()的时候连接并未创建好,ClientBoss相当于异步去校验connect()的结果。

注册OP_CONNECT事件,之后在process()方法中处理连接建立事件

最终也是将建立好的连接注册给worker。

NioWorker

我们从上文提到的worker.register()方法说起,该方法会创建一个woker的RegisterTask

会对channel进行配置,向selector注册事件。server端会再次配置non-blocking。getInternalInterestOps()的默认值只有OP_READ,在channel有写操作的时候才会把OP_WRITE也注册进去。最终的结果是将一个新建的channel注册到了该worker的selector上,这样这个channel的IO事件都会由这个worker去处理。接下来再看worker的process方法

此处可以看到对连接读写事件的处理。

Boss & Worker线程的启动

在构建ChannelFactory的同时,会创建对应的ServerBossPool、ClientBossPool以及WorkerPool

Boss & Worker Pool

它们具有类似的启动流程:构造函数调用init()方法,之后通过newBoss() / newWorker()创建一定数量的实例,这些方法直接调用Boss / Worker的构造函数,随后一直super()到基类AbstractNioSelector的openSelector()方法。

这里会通过DeadLockProofWorker.start()在对应的executor中启动每一个线程。newThreadRenamingRunnable()将当前的Boss / Worker进行了一层包装,用于修改线程的名称。后续需要Boss或Worker的时候,通过nextBoss() / nextWorker()顺序循环从数组中取出,以保证连接均匀分配给所有的worker。

以上是Boss与Worker相关的主要内容,Worker中具体的读写事件流程将在后面的文章进行介绍。

 

© 2015, 高飞航.cn. 版权所有.

About gaofeihang

开发工程师,本站的作者。欢迎留下您宝贵的意见!

发表评论

电子邮件地址不会被公开。 必填项已用*标注