close()与shutdownGracefully()

state:writing

Channel.close()EventExecutorGroup.shutdownGracefully()

  • Channel.close()

    Channel.close()的定义为:

    /**
     * Request to close this {@link Channel} and notify the {@link ChannelFuture} once the operation completes,
     * either because the operation was successful or because of
     * an error.
     *
     * After it is closed it is not possible to reuse it again.
     * <p>
     * This will result in having the
     * {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)}
     * method called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
     * {@link Channel}.
     */
    ChannelFuture close();
    

    close()用来关闭Channel。其代码如下:

     @Override
    public ChannelFuture close() {
        return pipeline.close();
    }
    

    直接调用pipeline.close(),继续跟踪下去有AbstractChannel.AbstractSafe.close()方法

    @Override
    public final void close(final ChannelPromise promise) {
        if (!promise.setUncancellable()) {
            return;
        }
    
        if (inFlush0) {
            // 如果正在处理数据,则把close()封装成一个task后面执行
            invokeLater(new OneTimeTask() {
                @Override
                public void run() {
                    close(promise);
                }
            });
            return;
        }
    
        if (closeFuture.isDone()) {
            // Closed already.
            safeSetSuccess(promise);
            return;
        }
    
        boolean wasActive = isActive();
        // 把输出缓冲区的数据备份之后然后清空,放置去他的数据被加进来
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
    
        try {
            doClose();
            closeFuture.setClosed();
            safeSetSuccess(promise);
        } catch (Throwable t) {
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    
        // Fail all the queued messages
        try {
            outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
            outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
        } finally {
            if (wasActive && !isActive()) {
                // 由Active变为Inactive,触发调用ChannelHandler.channelInactive()方法
                invokeLater(new OneTimeTask() {
                    @Override
                    public void run() {
                        pipeline.fireChannelInactive();
                    }
                });
            }
            deregister(voidPromise());
        }
    }
    

    直接调用doClose()关闭套接字,对于Server端和Client都一样.

    @Override
    protected void doClose() throws Exception {
        javaChannel().close();
    }
    

    直接调用底层的SocketChannel关闭套接字。
    接下来会判断状态,如果状态由Active转变为Inactive,则会调用pipeline.fireChanneInactive()触发ChannelPipeline中的ChannelHandler.channelActive()方法的调用。
    接下来会调用doRegister()方法,方法定义为

    @Override
    public final void deregister(final ChannelPromise promise) {
        if (!promise.setUncancellable()) {
            return;
        }
        if (!registered) {
            safeSetSuccess(promise);
            return;
        }
        try {
            // 实际的去注册动作
            doDeregister();
        } catch (Throwable t) {
            logger.warn("Unexpected exception occurred while deregistering a channel.", t);
        } finally {
            if (registered) {
                registered = false;
                // 如果已经注册了则触发调用ChanenlHandler.channelUngister()方法
                invokeLater(new OneTimeTask() {
                    @Override
                    public void run() {
                        pipeline.fireChannelUnregistered();
                    }
                });
                safeSetSuccess(promise);
            } else {
                // Some transports like local and AIO does not allow the deregistration of
                // an open channel.  Their doDeregister() calls close().  Consequently,
                // close() calls deregister() again - no need to fire channelUnregistered.
                safeSetSuccess(promise);
            }
        }
    }
    

    doDeregister()方法为

    @Override
    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey());
    }
    

    方法中调用Eventloop.cancel()方法,参数selectionKey()返回对应ChannelSelectionKey对象,cancel()实现为

    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }
    

    cancel()中把SelectionKey给取消了。也就是从Selector中去除掉SelectionKey,不在监听对应的Channel,(需要调用Selector.select()方法才会真正的去除掉)。接下来会触发Channelhandler.channelUnregistered()方法的执行。

    总结下,对于Channel.close(),调用之后会做以下几件事:

    1. 会调用doClose()方法关闭底层的套接字
    2. 调用ChannelPipeline.fireChannelInactive()方法触发ChannelHandler.channelInactive()方法的执行。
    3. 接下来会调用doDeregister()把对应的ChannelSelectionKeyEventloop.Selector中剥离出来。
    4. 调用ChannelPipeline.fireChannelUnregistered()方法触发ChannelHandler.channelUnregistered()方法的执行。
  • EventExecutorGroup.shutdownGracefully()

    shutdownGracefully()实际上是调用shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)方法,该方法定义为

    /**
     * Signals this executor that the caller wants the executor to be shut down.  Once this method is called,
     * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down.
     * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i>
     * (usually a couple seconds) before it shuts itself down.  If a task is submitted during the quiet period,
     * it is guaranteed to be accepted and the quiet period will start over.
     *
     * @param quietPeriod the quiet period as described in the documentation
     * @param timeout     the maximum amount of time to wait until the executor is {@linkplain #shutdown()}
     *                    regardless if a task was submitted during the quiet period
     * @param unit        the unit of {@code quietPeriod} and {@code timeout}
     *
     * @return the {@link #terminationFuture()}
     */
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    

    方法实现如下:

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        if (quietPeriod < 0) {
            throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
        }
        if (timeout < quietPeriod) {
            throw new IllegalArgumentException(
                    "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
    
        if (isShuttingDown()) {
            return terminationFuture();
        }
    
        boolean inEventLoop = inEventLoop();
        boolean wakeup;
        int oldState;
        for (;;) {
            if (isShuttingDown()) {
                return terminationFuture();
            }
            int newState;
            wakeup = true;
            oldState = STATE_UPDATER.get(this);
            if (inEventLoop) {
                newState = ST_SHUTTING_DOWN;
            } else {
                switch (oldState) {
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                        newState = ST_SHUTTING_DOWN;
                        break;
                    default:
                        newState = oldState;
                        wakeup = false;
                }
            }
            if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                break;
            }
        }
        gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
        gracefulShutdownTimeout = unit.toNanos(timeout);
    
        if (oldState == ST_NOT_STARTED) {
            thread.start();
        }
    
        if (wakeup) {
            wakeup(inEventLoop);
        }
    
        return terminationFuture();
    }
    
2017-05-15 19:1321