> `state:writing` ### `Channel.close()`与`EventExecutorGroup.shutdownGracefully()` * `Channel.close()` `Channel.close()`的定义为: ```java /** * Request to close this {@link Channel} and notify the {@link ChannelFuture} once the operation completes, * either because the operation was successful or because of * an error. * * After it is closed it is not possible to reuse it again. *
* This will result in having the * {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. */ ChannelFuture close(); ``` `close()`用来关闭`Channel`。其代码如下: ```java @Override public ChannelFuture close() { return pipeline.close(); } ``` 直接调用`pipeline.close()`,继续跟踪下去有`AbstractChannel.AbstractSafe.close()`方法 ```java @Override public final void close(final ChannelPromise promise) { if (!promise.setUncancellable()) { return; } if (inFlush0) { // 如果正在处理数据,则把close()封装成一个task后面执行 invokeLater(new OneTimeTask() { @Override public void run() { close(promise); } }); return; } if (closeFuture.isDone()) { // Closed already. safeSetSuccess(promise); return; } boolean wasActive = isActive(); // 把输出缓冲区的数据备份之后然后清空,放置去他的数据被加进来 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. try { doClose(); closeFuture.setClosed(); safeSetSuccess(promise); } catch (Throwable t) { closeFuture.setClosed(); safeSetFailure(promise, t); } // Fail all the queued messages try { outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION); } finally { if (wasActive && !isActive()) { // 由Active变为Inactive,触发调用ChannelHandler.channelInactive()方法 invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelInactive(); } }); } deregister(voidPromise()); } } ``` 直接调用`doClose()`关闭套接字,对于`Server`端和`Client`都一样. ```java @Override protected void doClose() throws Exception { javaChannel().close(); } ``` 直接调用底层的`SocketChannel`关闭套接字。 接下来会判断状态,如果状态由`Active`转变为`Inactive`,则会调用`pipeline.fireChanneInactive()`触发`ChannelPipeline`中的`ChannelHandler.channelActive()`方法的调用。 接下来会调用`doRegister()`方法,方法定义为 ```java @Override public final void deregister(final ChannelPromise promise) { if (!promise.setUncancellable()) { return; } if (!registered) { safeSetSuccess(promise); return; } try { // 实际的去注册动作 doDeregister(); } catch (Throwable t) { logger.warn("Unexpected exception occurred while deregistering a channel.", t); } finally { if (registered) { registered = false; // 如果已经注册了则触发调用ChanenlHandler.channelUngister()方法 invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelUnregistered(); } }); safeSetSuccess(promise); } else { // Some transports like local and AIO does not allow the deregistration of // an open channel. Their doDeregister() calls close(). Consequently, // close() calls deregister() again - no need to fire channelUnregistered. safeSetSuccess(promise); } } } ``` `doDeregister()`方法为 ```java @Override protected void doDeregister() throws Exception { eventLoop().cancel(selectionKey()); } ``` 方法中调用`Eventloop.cancel()`方法,参数`selectionKey()`返回对应`Channel`的`SelectionKey`对象,`cancel()`实现为 ```java void cancel(SelectionKey key) { key.cancel(); cancelledKeys ++; if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; needsToSelectAgain = true; } } ``` `cancel()`中把`SelectionKey`给取消了。也就是从`Selector`中去除掉`SelectionKey`,不在监听对应的`Channel`,(需要调用`Selector.select()`方法才会真正的去除掉)。接下来会触发`Channelhandler.channelUnregistered()`方法的执行。 总结下,对于`Channel.close()`,调用之后会做以下几件事: 1. 会调用`doClose()`方法关闭底层的套接字 2. 调用`ChannelPipeline.fireChannelInactive()`方法触发`ChannelHandler.channelInactive()`方法的执行。 3. 接下来会调用`doDeregister()`把对应的`Channel`的`SelectionKey`从`Eventloop.Selector`中剥离出来。 4. 调用`ChannelPipeline.fireChannelUnregistered()`方法触发`ChannelHandler.channelUnregistered()`方法的执行。 * `EventExecutorGroup.shutdownGracefully()` `shutdownGracefully()`实际上是调用`shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)`方法,该方法定义为 ```java /** * Signals this executor that the caller wants the executor to be shut down. Once this method is called, * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down. * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for 'the quiet period' * (usually a couple seconds) before it shuts itself down. If a task is submitted during the quiet period, * it is guaranteed to be accepted and the quiet period will start over. * * @param quietPeriod the quiet period as described in the documentation * @param timeout the maximum amount of time to wait until the executor is {@linkplain #shutdown()} * regardless if a task was submitted during the quiet period * @param unit the unit of {@code quietPeriod} and {@code timeout} * * @return the {@link #terminationFuture()} */ Future> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); ``` 方法实现如下: ```java @Override public Future> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { if (quietPeriod < 0) { throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)"); } if (timeout < quietPeriod) { throw new IllegalArgumentException( "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); } if (unit == null) { throw new NullPointerException("unit"); } if (isShuttingDown()) { return terminationFuture(); } boolean inEventLoop = inEventLoop(); boolean wakeup; int oldState; for (;;) { if (isShuttingDown()) { return terminationFuture(); } int newState; wakeup = true; oldState = STATE_UPDATER.get(this); if (inEventLoop) { newState = ST_SHUTTING_DOWN; } else { switch (oldState) { case ST_NOT_STARTED: case ST_STARTED: newState = ST_SHUTTING_DOWN; break; default: newState = oldState; wakeup = false; } } if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { break; } } gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); gracefulShutdownTimeout = unit.toNanos(timeout); if (oldState == ST_NOT_STARTED) { thread.start(); } if (wakeup) { wakeup(inEventLoop); } return terminationFuture(); } ```