Channel 的 register 操作
经过前面的铺垫,我们已经具备一定的基础了,我们开始来把前面学到的内容揉在一起。这节,我们会介绍 register 操作,这一步其实是非常关键的,对于我们源码分析非常重要。
register
我们从 EchoClient 中的 connect() 方法出发,或者 EchoServer 的 bind(port) 方法出发,都会走到 initAndRegister() 这个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { ... } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
|
initAndRegister() 这个方法我们已经接触过两次了,前面介绍了 1️⃣ Channel 的实例化,实例化过程中,会执行 Channel 内部 Unsafe 和 Pipeline 的实例化,以及在上面 2️⃣ init(channel) 方法中,会往 pipeline 中添加 handler(pipeline 此时是 head+channelnitializer+tail)。
我们这节终于要揭秘 ChannelInitializer 中的 initChannel 方法了~~~
现在,我们继续往下走,看看 3️⃣ register 这一步:
1
| ChannelFuture regFuture = config().group().register(channel);
|
我们说了,register 这一步是非常关键的,它发生在 channel 实例化以后,大家回忆一下当前 channel 中的一些情况:
实例化了 JDK 底层的 Channel,设置了非阻塞,实例化了 Unsafe,实例化了 Pipeline,同时往 pipeline 中添加了 head、tail 以及一个 ChannelInitializer 实例。
上面的 config().group()
方法会返回前面实例化的 NioEventLoopGroup 的实例,然后调用其 register(channel) 方法:
// MultithreadEventLoopGroup
1 2 3 4
| @Override public ChannelFuture register(Channel channel) { return next().register(channel); }
|
next() 方法很简单,就是选择线程池中的一个线程(还记得 chooserFactory 吗),也就是选择一个 NioEventLoop 实例,这个时候我们就进入到 NioEventLoop 了。
NioEventLoop 的 register(channel) 方法实现在它的父类 SingleThreadEventLoop 中:
1 2 3 4
| @Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
|
上面的代码实例化了一个 Promise,将当前 channel 带了进去:
1 2 3 4 5 6 7
| @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
|
拿到 channel 中关联的 Unsafe 实例,然后调用它的 register 方法:
我们说过,Unsafe 专门用来封装底层实现,当然这里也没那么“底层”
// AbstractChannel#AbstractUnsafe
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { ... } } }
|
到这里,我们要明白,NioEventLoop 中是还没有实例化 Thread 实例的。
这几步涉及到了好几个类:NioEventLoop、Promise、Channel、Unsafe 等,大家要仔细理清楚它们的关系。
对于我们前面过来的 register 操作,其实提交到 eventLoop 以后,就直接返回 promise 实例了,剩下的register0 是异步操作,它由 NioEventLoop 实例来完成。
我们这边先不继续往里分析 register0(promise) 方法,先把前面欠下的 NioEventLoop 中的线程介绍清楚,然后再回来介绍这个 register0 方法。
Channel 实例一旦 register 到了 NioEventLoopGroup 实例中的某个 NioEventLoop 实例,那么后续该 Channel 的所有操作,都是由该 NioEventLoop 实例来完成的。
这个也非常简单,因为 Selector 实例是在 NioEventLoop 实例中的,Channel 实例一旦注册到某个 Selector 实例中,当然也只能在这个实例中处理 NIO 事件。
NioEventLoop 工作流程
前面,我们在分析线程池的实例化的时候说过,NioEventLoop 中并没有启动 Java 线程。这里我们来仔细分析下在 register 过程中调用的 eventLoop.execute(runnable) 这个方法,这个代码在父类 SingleThreadEventExecutor 中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown() && removeTask(task)) { reject(); } }
if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
|
原来启动 NioEventLoop 中的线程的方法在这里。
另外,上节我们说的 register 操作进到了 taskQueue 中,所以它其实是被归类到了非 IO 操作的范畴。
下面是 startThread 的源码,判断线程是否已经启动来决定是否要进行启动操作:
1 2 3 4 5 6 7 8 9 10 11 12
| private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { try { doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); } } } }
|
我们按照前面的思路,根据线程没有启动的情况,来看看 doStartThread() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); }
boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { } } }); }
|
上面线程启动以后,会执行 NioEventLoop 中的 run() 方法,这是一个非常重要的方法,这个方法肯定是没那么容易结束的,必然是像 JDK 线程池的 Worker 那样,不断地循环获取新的任务的。它需要不断地做 select 操作和轮询 taskQueue 这个队列。
我们先来简单地看一下它的源码,这里先不做深入地介绍:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| @Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
|
上面这段代码是 NioEventLoop 的核心,这里介绍两点:
- 首先,会根据 hasTasks() 的结果来决定是执行 selectNow() 还是 select(oldWakenUp),这个应该好理解。如果有任务正在等待,那么应该使用无阻塞的 selectNow(),如果没有任务在等待,那么就可以使用带阻塞的 select 操作。
- ioRatio 控制 IO 操作所占的时间比重:
- 如果设置为 100%,那么先执行 IO 操作,然后再执行任务队列中的任务。
- 如果不是 100%,那么先执行 IO 操作,然后执行 taskQueue 中的任务,但是需要控制执行任务的总时间。也就是说,非 IO 操作可以占用的时间,通过 ioRatio 以及这次 IO 操作耗时计算得出。
我们这里先不要去关心 select(oldWakenUp)、processSelectedKeys() 方法和 runAllTasks(…) 方法的细节,只要先理解它们分别做什么事情就可以了。
回过神来,我们前面在 register 的时候提交了 register 任务给 NioEventLoop,这是 NioEventLoop 接收到的第一个任务,所以这里会实例化 Thread 并且启动,然后进入到 NioEventLoop 中的 run 方法。
继续 register
我们回到前面的 register0(promise) 方法,我们知道,这个 register 任务进入到了 NioEventLoop 的 taskQueue 中,然后会启动 NioEventLoop 中的线程,该线程会轮询这个 taskQueue,然后执行这个 register 任务。
注意,此时执行该方法的是 eventLoop 中的线程:
// AbstractChannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| private void register0(ChannelPromise promise) { try { ... boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise); pipeline.fireChannelRegistered();
if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { ... } }
|
我们先说掉上面的 doRegister() 方法,然后再说 pipeline。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { ... } } }
|
我们可以看到,这里做了 JDK 底层的 register 操作,将 SocketChannel(或 ServerSocketChannel) 注册到 Selector 中,并且可以看到,这里的监听集合设置为了 0,也就是什么都不监听。
当然,也就意味着,后续一定有某个地方会需要修改这个 selectionKey 的监听集合,不然啥都干不了
我们重点来说说 pipeline 操作,我们之前在介绍 NioSocketChannel 的 pipeline 的时候介绍到,我们的 pipeline 现在长这个样子:

现在,我们将看到这里会把 LoggingHandler 和 EchoClientHandler 添加到 pipeline。
我们继续看代码,register 成功以后,执行了以下操作:
1
| pipeline.invokeHandlerAddedIfNeeded();
|
大家可以跟踪一下,这一步会执行到 pipeline 中 ChannelInitializer 实例的 handlerAdded 方法,在这里会执行它的 init(context) 方法:
1 2 3 4 5 6
| @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { initChannel(ctx); } }
|
然后我们看下 initChannel(ctx),这里终于来了我们之前介绍过的 init(channel) 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { ... } finally { remove(ctx); } return true; } return false; }
|
我们前面也说过,ChannelInitializer 的 init(channel) 被执行以后,那么其内部添加的 handlers 会进入到 pipeline 中,然后上面的 finally 块中将 ChannelInitializer 的实例从 pipeline 中删除,那么此时 pipeline 就算建立起来了,如下图:

其实这里还有个问题,如果我们在 ChannelInitializer 中添加的是一个 ChannelInitializer 实例呢?大家可以考虑下这个情况。
pipeline 建立了以后,然后我们继续往下走,会执行到这一句:
1
| pipeline.fireChannelRegistered();
|
我们只要摸清楚了 fireChannelRegistered() 方法,以后碰到其他像 fireChannelActive()、fireXxx() 等就知道怎么回事了,它们都是类似的。我们来看看这句代码会发生什么:
// DefaultChannelPipeline
1 2 3 4 5 6
| @Override public final ChannelPipeline fireChannelRegistered() { AbstractChannelHandlerContext.invokeChannelRegistered(head); return this; }
|
也就是说,我们往 pipeline 中扔了一个 channelRegistered 事件,这里的 register 属于 Inbound 事件,pipeline 接下来要做的就是执行 pipeline 中的 Inbound 类型的 handlers 中的 channelRegistered() 方法。
从上面的代码,我们可以看出,往 pipeline 中扔出 channelRegistered 事件以后,第一个处理的 handler 是 head。
接下来,我们还是跟着代码走,此时我们来到了 pipeline 的第一个节点 head 的处理中:
// AbstractChannelHandlerContext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } }
|
也就是说,这里会先执行 head.invokeChannelRegistered() 方法,而且是放到 NioEventLoop 中的 taskQueue 中执行的:
// AbstractChannelHandlerContext
1 2 3 4 5 6 7 8 9 10 11 12
| private void invokeChannelRegistered() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } }
|
我们去看 head 的 channelRegistered 方法:
// HeadContext
1 2 3 4 5 6 7
| @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); }
|
然后 head 会执行 fireChannelRegister() 方法:
// AbstractChannelHandlerContext
1 2 3 4 5 6 7
| @Override public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(findContextInbound()); return this; }
|
注意:pipeline.fireChannelRegistered() 是将 channelRegistered 事件抛到 pipeline 中,pipeline 中的 handlers 准备处理该事件。而 context.fireChannelRegistered() 是一个 handler 处理完了以后,向后传播给下一个 handler。
它们两个的方法名字是一样的,但是来自于不同的类。
findContextInbound() 将找到下一个 Inbound 类型的 handler,然后又是重复上面的几个方法。
我觉得上面这块代码没必要太纠结,总之就是从 head 中开始,依次往下寻找所有 Inbound handler,执行其 channelRegistered(ctx) 操作。
说了这么多,我们的 register 操作算是真正完成了。
下面,我们回到 initAndRegister 这个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { ... }
ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } }
return regFuture; }
|
我们要知道,不管是服务端的 NioServerSocketChannel 还是客户端的 NioSocketChannel,在 bind 或 connect 时,都会先进入 initAndRegister 这个方法,所以我们上面说的那些,对于两者都是通用的。
大家要记住,register 操作是非常重要的,要知道这一步大概做了哪些事情,register 操作以后,将进入到 bind 或 connect 操作中。
connect 过程和 bind 过程分析
上面我们介绍的 register 操作非常关键,它建立起来了很多的东西,它是 Netty 中 NioSocketChannel 和 NioServerSocketChannel 开始工作的起点。
这一节,我们来说说 register 之后的 connect 操作和 bind 操作。这节非常简单。
connect 过程分析
对于客户端 NioSocketChannel 来说,前面 register 完成以后,就要开始 connect 了,这一步将连接到服务端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel();
if (regFuture.isDone()) { if (!regFuture.isSuccess()) { return regFuture; } return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { .... } }
|
这里大家自己一路点进去,我就不浪费篇幅了。最后,我们会来到 AbstractChannel 的 connect 方法:
1 2 3 4
| @Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); }
|
我们看到,connect 操作是交给 pipeline 来执行的。进入 pipeline 中,我们会发现,connect 这种 Outbound 类型的操作,是从 pipeline 的 tail 开始的:
前面我们介绍的 register 操作是 Inbound 的,是从 head 开始的
1 2 3 4
| @Override public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return tail.connect(remoteAddress, promise); }
|
接下来就是 pipeline 的操作了,从 tail 开始,执行 pipeline 上的 Outbound 类型的 handlers 的 connect(…) 方法,那么真正的底层的 connect 的操作发生在哪里呢?还记得我们的 pipeline 的图吗?

从 tail 开始往前找 out 类型的 handlers,每经过一个 handler,都执行里面的 connect() 方法,最后会到 head 中,因为 head 也是 Outbound 类型的,我们需要的 connect 操作就在 head 中,它会负责调用 unsafe 中提供的 connect 方法:
1 2 3 4 5 6 7
| public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }
|
接下来,我们来看一看 connect 在 unsafe 类中所谓的底层操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Override public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { ...... boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress;
int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); }
promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } }
|
如果上面的 doConnect 方法返回 false,那么后续是怎么处理的呢?
在上一节介绍的 register 操作中,channel 已经 register 到了 selector 上,只不过将 interestOps 设置为了 0,也就是什么都不监听。
而在上面的 doConnect 方法中,我们看到它在调用底层的 connect 方法后,会设置 interestOps 为 SelectionKey.OP_CONNECT
。
剩下的就是 NioEventLoop 的事情了,还记得 NioEventLoop 的 run() 方法吗?也就是说这里的 connect 成功以后,这个 TCP 连接就建立起来了,后续的操作会在 NioEventLoop.run()
方法中被 processSelectedKeys()
方法处理掉。
bind 过程分析
说完 connect 过程,我们再来简单看下 bind 过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; }
if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { ...... } }
|
然后一直往里看,会看到,bind 操作也是要由 pipeline 来完成的:
// AbstractChannel
1 2 3 4
| @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
|
bind 操作和 connect 一样,都是 Outbound 类型的,所以都是 tail 开始:
1 2 3 4
| @Override public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
|
最后的 bind 操作又到了 head 中,由 head 来调用 unsafe 提供的 bind 方法:
1 2 3 4 5 6
| @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
|
感兴趣的读者自己去看一下 unsafe 中的 bind 方法,非常简单,bind 操作也不是什么异步方法,我们就介绍到这里了。
本节非常简单,就是想和大家介绍下 Netty 中各种操作的套路。