Netty数据流分析

前言

稍微对tcp/ip协议有了解的同学都会知道,数据都是以二进制流的形式在网络上进行传播的,而且这些二进制流没有起始和结束标志。我们处理的往往都是Java对象,因此在接收到二进制流数据之后需要对数据进行拆包和反序列化成对象。
这篇文章就是从源代码角度来分析Netty中从接收到数据二进制流到转化为Java对象整个过程。

第一步:接收数据

Netty底层是以Java NIO来实现的,所以Netty中是在Selector中接收数据的,具体的代码在NioEventLoop.processSelectedKey()这个方法中,代码如下:

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // 其他省略代码
        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                // 接收到读事件或者连接事件
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            // 其他省略代码
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

从代码里面可以看到在接收到可读时事件之后会调用unsafe.read()方法来接收数据。
对于Client而言,read()的实现是在AbstractNioByteChannel这个类的内部类AbstractNioUnsafe类中,具体实现如下:

public void read() {
            // 其他省略代码
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            // 从配置属性中读取配置的 每次可以读取的最大消息数 
            final int maxMessagesPerRead = config.getMaxMessagesPerRead();
            RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
            if (allocHandle == null) {
                this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
            }

            // 保存接收到的二进制流
            ByteBuf byteBuf = null;
            int messages = 0;
            boolean close = false;
            try {
                // 保存总共读取的字节数
                int totalReadAmount = 0;
                boolean readPendingReset = false;
                do {
                    // 给底层的数据容器分配空间
                    byteBuf = allocHandle.allocate(allocator);
                    int writable = byteBuf.writableBytes();
                    int localReadAmount = doReadBytes(byteBuf);
                    if (localReadAmount <= 0) {
                        // not was read release the buffer
                        byteBuf.release();
                        close = localReadAmount < 0;
                        break;
                    }
                    if (!readPendingReset) {
                        readPendingReset = true;
                        setReadPending(false);
                    }
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;

                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                        // Avoid overflow.
                        totalReadAmount = Integer.MAX_VALUE;
                        break;
                    }

                    totalReadAmount += localReadAmount;

                    // stop reading
                    if (!config.isAutoRead()) {
                        break;
                    }

                    if (localReadAmount < writable) {
                        // Read less than what the buffer can hold,
                        // which might mean we drained the recv buffer completely.
                        break;
                    }
                } while (++ messages < maxMessagesPerRead);

                pipeline.fireChannelReadComplete();
                allocHandle.record(totalReadAmount);

                if (close) {
                    closeOnRead(pipeline);
                    close = false;
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!config.isAutoRead() && !isReadPending()) {
                    removeReadOp();
                }
            }
        }
    }

第二步:拆包

第三步:反序列化

2017-05-10 19:5134