Netty中的一些小问题

  • 关于isActive()方法:

    isActive()这个方法在整个流程中都会用到,用来判断对应的链路是否激活。定义如下:

    /**
     * Return {@code true} if the {@link Channel} is active and so connected.
     */
    boolean isActive();
    

    Server端:

    Server端的实现在NioServerSocketChannel.java中,实现如下:

    @Override
    public boolean isActive() {
        return javaChannel().socket().isBound();
    }
    

    直接调用ServerSocket.isBound()方法,该方法的作用是返回ServerSocket的绑定状态,如果ServerSocket已经成功绑定到一个地址之后会返回true。可见,对于Server端,只要ServerChannel成功绑定到一个地址之后就认为处于激活状态。

    Client端:

    Client端的实现在NioSocketChannel.java中,实现如下:

    @Override
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }
    

    ch.isConnected()作用是Tells whether or not this channel's network socket is connected.Return true if, and only if, this channel's network socket is {@link #isOpen open} and connected。
    可见对于Client,只有在已经打开并且连接上服务期之后才处于激活状态。

  • channelRegistered()方法:

    channelRegistered()的定义如下:

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    

    从注释里面可以看到是在Channel绑定到Eventloop上面的时候调用的。

    不管是Server还是Client,绑定到Eventloop的时候,最终都是调用Abstract.initAndRegister()这个方法上(Server是在AbstractBootstrap.doBind()的时候调用的,Client是在Bootstrap.doConnect()的时候调用的)。
    initAndRegister()方法定义如下:

    final ChannelFuture initAndRegister() {
        final Channel channel = channelFactory().newChannel();
        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // 把channel绑定到Eventloop对象上面去
        ChannelFuture regFuture = group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
    

    继续跟踪下去会定位到AbstractChannel.AbstractUnsafe.register0()方法上.

    private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                // 做实际的绑定动作。把Channel感兴趣的事件注册到Eventloop.selector上面.具体实现在Abstract.doRegister()方法内
                doRegister();
                registered = true;
                safeSetSuccess(promise);
                // 调用ChannelPipeline中的ChannelHandler的channelRegister()方法
                pipeline.fireChannelRegistered();
                if (isActive()) {
                    // 如果当前链路已经激活,则调用channelActive()方法
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    

    从上面代码可以看到,在调用doRegister()把对应的Channel注册到对应的Eventloop.Selector上面去之后,就会调用pipeline.fireChannelActive()方法触发ChannelPipeline里面注册的ChannelHandlerchannleActive()方法的调用。

    从上面的代码也可以看出,在调用完pipeline.fireChannelRegistered()之后,紧接着会调用isActive()判断当前链路是否激活,如果激活了则会调用pipeline.fireChannelActive()方法。关于isActive()方法,上面说了。

    这个时候,对于ClientServer都还没有激活,所以,这个时候不管是Server还是Client都不会pipeline.fireChanenlActive()方法,至于什么时候调用下面会说。

  • channelActive()方法

    Client端:connect()之后触发

    对于Client是在connect()成功之后触发的。代码在AbstractNioChannel.connect()中。

    @Override
        public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
    
            try {
                if (connectPromise != null) {
                    throw new IllegalStateException("connection attempt already made");
                }
    
                boolean wasActive = isActive();
                // doConnect()进行实际的连接
                if (doConnect(remoteAddress, localAddress)) {
                    // 连接成功之后做一些列处理
                    fulfillConnectPromise(promise, wasActive);
                } else {
                   // 其他代码
                }
            } catch (Throwable t) {
                promise.tryFailure(annotateConnectException(t, remoteAddress));
                closeIfClosed();
            }
        }
    

    fulfillConnectProimse()代码如下:

    private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            // 其他代码
            // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
            // because what happened is what happened.
            // 只有在调用doConnect()之前没有激活且调用之后处于active状态才会调用channelActive()方法,保证只有第一次连接成功之后才会调用一次。
            if (!wasActive && isActive()) {
                // 调用ChannelPipeline中的ChannelHandler的channelActive()方法
                pipeline().fireChannelActive();
            }
    
            // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
            if (!promiseSet) {
                close(voidPromise());
            }
        }
    

    从上面的代码中可以看到,channelActive()只会在连接成功的时候被调用一次。

    Server端:bind()方法

    对于Server端,在调用bind()方法之后会进入到AbstractBootstrap.bind()方法中,最后会跟踪到AbstractChannel.bind()方法中有

    @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
    
            // 其他代码
    
            boolean wasActive = isActive();
            try {
                // 把socket与端口进行绑定
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }
    
            if (!wasActive && isActive()) {
                // 启动一个task,调用channelActive()方法
                invokeLater(new OneTimeTask() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }
    
            safeSetSuccess(promise);
        }
    

    在这里同样是会判断是不是第一次激活,如果是第一次激活则调用ChannelPipeline中的ChannelHandlerchannelActive()方法。

  • fireChannelRead()

    fireChannelRead()定义如下:

    /**
     * A {@link Channel} received a message.
     *
     * This will result in having the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}
     * method  called of the next {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
     * {@link Channel}.
     */
    ChannelHandlerContext fireChannelRead(Object msg);
    

    从注释中可以看出fireChannelRead()方法是在底层套接字收到数据之后被调用。

    首先定位到NioEventLoop.processSelectedKey()方法中:

    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
    
        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                // 收到 OP_READ事件,表示底层有数据到来
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            // 其他无关代码
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    

    可以看到,当检测到时数据之后,会调用unsafe.read()方法进行实际的数据读写。

    Client对象:

    Client对象的read()方法的实现在AbstractNioByteChannel.read()方法中,代码如下:

    @Override
        public void read() {
            final ChannelConfig config = config();
            if (!config.isAutoRead() && !isReadPending()) {
                // ChannelConfig.setAutoRead(false) was called in the meantime
                removeReadOp();
                return;
            }
    
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final int maxMessagesPerRead = config.getMaxMessagesPerRead();
            RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
            if (allocHandle == null) {
                this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
            }
    
            ByteBuf byteBuf = null;
            int messages = 0;
            boolean close = false;
            try {
                int totalReadAmount = 0;
                boolean readPendingReset = false;
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    int writable = byteBuf.writableBytes();
                    int localReadAmount = doReadBytes(byteBuf);
                    if (localReadAmount <= 0) {
                        // not was read release the buffer
                        byteBuf.release();
                        close = localReadAmount < 0;
                        break;
                    }
                    if (!readPendingReset) {
                        readPendingReset = true;
                        setReadPending(false);
                    }
                    // 没接受到数据都会调用fireChannelRead()方法
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
    
                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                        // Avoid overflow.
                        totalReadAmount = Integer.MAX_VALUE;
                        break;
                    }
    
                    totalReadAmount += localReadAmount;
    
                    // stop reading
                    if (!config.isAutoRead()) {
                        break;
                    }
    
                    if (localReadAmount < writable) {
                        // Read less than what the buffer can hold,
                        // which might mean we drained the recv buffer completely.
                        break;
                    }
                } while (++ messages < maxMessagesPerRead);
    
                // 数据接收完成,调用fireChannelReadComplete()方法
                pipeline.fireChannelReadComplete();
                allocHandle.record(totalReadAmount);
    
                if (close) {
                    closeOnRead(pipeline);
                    close = false;
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!config.isAutoRead() && !isReadPending()) {
                    removeReadOp();
                }
            }
        }
    }
    

    Server端:

    Server端的read()实现是在AbstractNioMessageChanenl.read()方法中,代码如下:

    @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            if (!config.isAutoRead() && !isReadPending()) {
                // ChannelConfig.setAutoRead(false) was called in the meantime
                removeReadOp();
                return;
            }
    
            final int maxMessagesPerRead = config.getMaxMessagesPerRead();
            final ChannelPipeline pipeline = pipeline();
            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    for (;;) {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
    
                        // stop reading and remove op
                        if (!config.isAutoRead()) {
                            break;
                        }
    
                        if (readBuf.size() >= maxMessagesPerRead) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                    exception = t;
                }
                setReadPending(false);
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    // 接收到数据,调用fireChannelRead()方法
                    pipeline.fireChannelRead(readBuf.get(i));
                }
    
                readBuf.clear();
                // 数据接收完成,调用fireChannelReadComplete()
                pipeline.fireChannelReadComplete();
    
                if (exception != null) {
                    if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
                        // ServerChannel should not be closed even on IOException because it can often continue
                        // accepting incoming connections. (e.g. too many open files)
                        closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
                    }
    
                    pipeline.fireExceptionCaught(exception);
                }
    
                if (closed) {
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!config.isAutoRead() && !isReadPending()) {
                    removeReadOp();
                }
            }
        }
    

    可以看到在接收到数据后会调用fireChannelRead()方法,数据接收完成之后会调用fireChannelReadComplete()方法.

  • fireChannelInactive():

    fireChannelInactive()方法定义如下:

    /**
     * A {@link Channel} is inactive now, which means it is closed.
     *
     * This will result in having the  {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)} method
     * called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
     * {@link Channel}.
     */
    ChannelHandlerContext fireChannelInactive();
    

    fireChannelnactive()方法在两个地方会被调用:Channel.close()Channel.disconnect():

    Channel.close()

    从注释中可以看出fireChannelInactive()方法是在Channelclose()的时候被调用的。

    Channel.close()开始跟踪,进入到Abstract.close(),该方法调用的是ChannelPipeline.close(),进一步跟踪到DefaultPipeline.close()中有

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.close(promise);
    }
    

    调用的实际上是Unsafe.close()方法。

    @Override
    public final void close(final ChannelPromise promise) {
        // 其他省略代码
        try {
            // 实际的关闭动作
            doClose();
            closeFuture.setClosed();
            safeSetSuccess(promise);
        } catch (Throwable t) {
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    
        // Fail all the queued messages
        try {
            outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
            outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
        } finally {
            // 确保是从Active --> Inactive状态才会执行下面的方法
            if (wasActive && !isActive()) {
                invokeLater(new OneTimeTask() {
                    @Override
                    public void run() {
                        // 触发channelInactive()的执行
                        pipeline.fireChannelInactive();
                    }
                });
            }
            // 触发解注册动作
            deregister(voidPromise());
        }
    }
    

    可以看到在调用doClose()之后会触发pipeline.fireChannelInactive()方法的执行。在调用前会先确认状态是从Active--->Inactive

    Channel.disconnect()

    Channel.disconnect()跟踪进去最后会跟踪到AbstractChannel.disconnect()中,有

    @Override
    public final void disconnect(final ChannelPromise promise) {
        if (!promise.setUncancellable()) {
            return;
        }
    
        boolean wasActive = isActive();
        try {
            // 关闭连接。Server端支持这个操作,对于Client端,会调用javaChannel().close()关闭连接
            doDisconnect();
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
    
        if (wasActive && !isActive()) {
            // 状态从Active --> Inactive
            invokeLater(new OneTimeTask() {
                @Override
                public void run() {
                    // 触发ChannelHandler的channelInactive()方法的执行
                    pipeline.fireChannelInactive();
                }
            });
        }
    
        safeSetSuccess(promise);
        closeIfClosed(); // doDisconnect() might have closed the channel
    }
    
  • fireChannelUnregistered()方法

    fireChannelUnregistered()定义如下:

     /**
     * A {@link Channel} was unregistered from its {@link EventLoop}.
     *
     * This will result in having the  {@link ChannelInboundHandler#channelUnregistered(ChannelHandlerContext)} method
     * called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
     * {@link Channel}.
     */
    ChannelPipeline fireChannelUnregistered();
    

    从注释中可以看出fireChannelUnregistered()方法是在ChannelEventloop中解注册的时候被调用的。在上面讲fireChanenlInactive()的时候提到了当调用Channel.close()的时候最后会调用deregister(voidPromise())方法。这个方法如下:

    @Override
    public final void deregister(final ChannelPromise promise) {
        if (!promise.setUncancellable()) {
            return;
        }
    
        if (!registered) {
            safeSetSuccess(promise);
            return;
        }
    
        try {
            // 实际执行解注册动作
            doDeregister();
        } catch (Throwable t) {
            logger.warn("Unexpected exception occurred while deregistering a channel.", t);
        } finally {
            if (registered) {
                registered = false;
                invokeLater(new OneTimeTask() {
                    @Override
                    public void run() {
                        // 在这里会触发channelUngister()的执行
                        pipeline.fireChannelUnregistered();
                    }
                });
                safeSetSuccess(promise);
            } else {
                // Some transports like local and AIO does not allow the deregistration of
                // an open channel.  Their doDeregister() calls close().  Consequently,
                // close() calls deregister() again - no need to fire channelUnregistered.
                safeSetSuccess(promise);
            }
        }
    }
    

    可以知道channelUngister()方法是在Channel.close()的时候被触发执行的。

    顺带说下关于解注册,doDeregister()方法如下:

    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey());
    }
    

    改方法实际上就是调用Channel对应的SelectionKey.cancel()方法。

2017-06-26 21:1562