Netty

Netty源码分析 EventLoop(二)NioEventLoop

概述

NioEventLoop是最常用的EventLoop实现类,从继承关系来看NioEventLoop依次继承了一下几个类

  1. AbstractExecutorService
  2. AbstractEventExecutor
  3. AbstractScheduledEventExecutor
  4. SingleThreadEventExecutor
  5. SingleThreadEventLoop
  6. NioEventLoop

其中AbstractExecutorService是JDK中的线程池实现类,我们就从AbstractEventExecutor开始讲解

AbstractEventExecutor

该实现类不包含关键的处理逻辑,仅提供一些便捷方法和默认参数。

  • 指定shutdownQuietPeriod为2秒,shutDownTimeout为15秒
  • newPromise()返回new DefaultPromise(this)
  • 定义一个只包含自己的selfCollection = Collections.singleton(this)

AbstractScheduledEventExecutor

提供一个优先级队列存储被提交的Task,使这些Task能够在预先设定的时候后被取出

  • 使用Netty自己实现的DefaultPriorityQueue
  • 提供加入队列的方法ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
  • 提供探测最优先任务剩余时间的方法long nextScheduledTaskNano()
  • 提供读取队首元素的方法ScheduledFutureTask<?> peekScheduledTask()
  • 提供取出首元素的方法Runnable pollScheduledTask(long nanoTime),这里只会取出剩余时间小于nanoTime的元素

ScheduledFutureTask的到期时间被叫做deadlineNanos

SingleThreadEventExecutor

出现真正处理任务的资源定义

  • Queue taskQueue(使用LinkedBlockingQueue)
  • volatile Thread thread
  • Executor executor
  • Semaphore threadLock = new Semaphore(0)
  • volatile int state

这里的state包括5种状态

  1. ST_NOT_STARTED
  2. ST_STARTED
  3. ST_SHUTTING_DOWN
  4. ST_SHUTDOWN
  5. ST_TERMINATED

这里有一个比较有意思的实现是,state字段并没有用AtomicInteger,而是用普通的int字段加上AtomicIntegerFieldUpdater来更新该字段

提供SingleThreadEventExecutor构造方法创建实例,需要传入

  1. EventExecutorGroup parent
  2. Executor executor
  3. RejectedExecutionHandler rejectedHandler
  4. maxPendingTasks 最大值不能超过16

execute(Runnable task)的执行过程为

  1. 将任务添加到taskQueue
  2. 如果状态是未启动,通过传入的Executor执行SingleThreadEventExecutor.this.run()方法

这个run方法是抽象方法,具体逻辑由子类实现。可见SingleThreadEventExecutor这一层主要维护了任务队列和EventExecutor运行状态,并提供runAllTasks(long timeoutNanos)执行所有的任务,步骤为

  1. 先fetchFromScheduledTaskQueue()将所有延迟任务放入taskQueue
  2. 循环将taskQueue中的任务依次执行
  3. 通过safeExecute()方法catch (Throwable t)来执行任务,避免循环中断退出

SingleThreadEventLoop

开始处理Channel的相关逻辑

  • 提供ChannelFuture register(Channel channel)方法,底层将Channel包装成DefaultChannelPromise(channel, this)进行注册

NioEventLoop

开始处理Nio的相关逻辑,包含以下字段

  • Selector selector
  • SelectedSelectionKeySet selectedKeys

包含以下主要逻辑

  • 通过传入的SelectorProvider和openSelector()来初始化selector
  • 注册JDK的Channel:register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)
  • 最终将JDK的Channel注册到初始化后的selector:ch.register(selector, interestOps, task)

可以想到此类最关键的逻辑在run方法,而run方法最关键的一段逻辑如下

在for(;;)循环中,每次先processSelectedKeys()获取IO事件,之后runAllTasks()执行任务队列中的所有任务。这里有一个ioRatio参数,避免读取IO事件占用过多的时间:先计算一下IO操作是使用了多少时间,再按比例折算成runAllTasks()可以执行多少时间,以此保证ioRatio符合预期

将IO事件转换为Task

最终通过select获取到的key会调用processSelectedKey(SelectionKey k, AbstractNioChannel ch)来生成task

  1. 先获取AbstractNioChannel.NioUnsafe unsafe = ch.unsafe()
  2. 对于OP_CONNECT,执行unsafe.finishConnect()
  3. 对于OP_WRITE,执行unsafe().forceFlush()
  4. 对于OP_READ或OP_ACCEPT,执行unsafe.read()

这里的AbstractNioChannel就是register阶段传入的NioTask,因此具体转换逻辑在讲解Channel相关内容时再进行介绍

总结

回顾NioEventLoop的实现过程

  1. AbstractExecutorService:JDK中的线程池抽象实现
  2. AbstractEventExecutor:增加便捷方法和默认参数
  3. AbstractScheduledEventExecutor:提供优先级队列支持延迟任务的处理
  4. SingleThreadEventExecutor:提供taskQueue处理普通任务、启动事件循环
  5. SingleThreadEventLoop:支持注册Netty Channel
  6. NioEventLoop:支持注册JDK的NIO Channel;NIO的核心逻辑、select IO事件并转换为task

© 2018 – 2019, 高飞航.cn. 版权所有.

About gaofeihang

开发工程师,本站的作者。欢迎留下您宝贵的意见!

发表评论

电子邮件地址不会被公开。 必填项已用*标注