Bootstrap源码分析

不管是客户端使用的Bootstrap还是服务端使用的ServerBootstrap都是继承自AbstractBootstrapAbstractBootstrap的定义如下

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
}

从定义中可见AbstracpBootstrap实现了Cloneable接口,能够进行复制。

Server端:ServeBootstrap

正如前文介绍的,ServerBootstrap的使用方式如下:

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .localAddress(new InetSocketAddress(port))
    .option(ChannelOption.SO_BACKLOG, 1024)
    .childHandler(new ChildChannelHandler());

// 绑定端口,同步等待成功
ChannelFuture future = bootstrap.bind().sync();

ServerBootstrap的入口点在于bind()方法,bind()方法的实现如下:

/**
 * Create a new {@link Channel} and bind it.
 */
public ChannelFuture bind() {
    validate();
    SocketAddress localAddress = this.localAddress;
    if (localAddress == null) {
        throw new IllegalStateException("localAddress not set");
    }
    return doBind(localAddress);
}

bind()方法中只是做个简单的校验,然后调用doBind()方法。doBind()的实现为:

private ChannelFuture doBind(final SocketAddress localAddress) {
        // 初始化并注册Channel
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and succesful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.executor = channel.eventLoop();
                    }
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
            return promise;
        }
    }

doBind()中首先调用了initAndRegister()方法来生成并注册一个ServerSocketChannel对象,进入到initAndRegister()方法中有:

final ChannelFuture initAndRegister() {
        // 首先调用channelFactory()来生成一个Channel对象
        final Channel channel = channelFactory().newChannel();
        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

在这个方法中,首先会调用Channel工厂来生成一个Channel对象,那么这个ChannelFactory哪里的呢?

final ChannelFactory<? extends C> channelFactory() {
    return channelFactory;
}

channelFactory()只是简单的返回channelFactory对象,而这个channelFactory对象是在用户代码调用AbstractBootstrap.channel(Class clazz)方法的时候生成的:

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}

channel()方法中,会先生成一个BootstrapChannelFactory()对象,然后通过channelFactory()这个方法把上一步新生成的ChannelFactory对象赋值给AbstracpBootstrapchannelFactory对象。BootstrapChannelFactory的实现如下:

private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
    private final Class<? extends T> clazz;

    BootstrapChannelFactory(Class<? extends T> clazz) {
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}

这个工厂类只是简单的通过反射来生成传入的Class对象的实例。所以综上所述,channelFactory().newChannel();这句代码会根据用户传入channel()方法中的Class对象生成对应的类的实例,而对于Server端传入的是NioServerSocketChannel.class,因此,此时这句代码生成的是一个NioServerSocketChannel对象。这个时候程序会进入到NioServerSocketChannel的默认构造函数里面去,实现如下:

/**
 * Create a new instance
 */
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

在默认构造函数中会先调用newSocket()方法生成一个ServerSocketChannel对象(关于ServerSocketChanenl的用法见Java NIO编程),然后作为参数调用另外一个构造函数

/**
 * Create a new instance using the given {@link ServerSocketChannel}.
 */
public NioServerSocketChannel(ServerSocketChannel channel) {
    // 调用父类的构造函数
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

在这个构造函数中首先会调用父类也就是AbstractNioMessageChannel的构造函数,继而又会调用AbstractNioChannel的构造函数

/**
 * Create a new instance
 *
 * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
 * @param ch                the underlying {@link SelectableChannel} on which it operates
 * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
 */
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
       // 其他省略代码
    }
}

从实现和注释里面可以看出,在这个构造函数里保存了底层的SelectableChannel希望监听的感兴趣的事件,而对于Server端而言,传进来的是ServerSocketChanenlSelectionKey.OP_ACCEPT,表示对ServerSocketChannel的连接事件感兴趣。

总结一下:在initAndRegister()方法中首先会执行下面这句代码:
final Channel channel = channelFactory().newChannel();这句代码实现的效果是生成了一个NioServerSocketChanenl对象,并且生成了一个底层的ServerSocketChannel对象,同时保存了感兴趣的事件为SelectionKey.OP_ACCEPT

接着回到initAndRegister()方法中,接下来会继续执行

// 把上一步生成的Chanel对象注册到Eventloop对象上
ChannelFuture regFuture = group().register(channel);

group()返回的是用户代码调用group()方法传入的EventLoopGroup对象。进一步跟踪进入到SingleThreadEventLoop.register()方法中

@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    channel.unsafe().register(this, promise);
    return promise;
}

只是简单的调用了Unsafe.register()方法,继续跟踪到AbstractChannel.register(EventLoop eventLoop,final ChannelPromise promise)方法里面,有

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 其他省略代码

    // 保存对应的Eventloop引用
    AbstractChannel.this.eventLoop = eventLoop;

    /**
    * 先判断执行这个动作的线程时候所对应的Eventloop线程发起的
    * 如果是则直接执行注册动作,如果不是,则把注册动作封装成一个
    * task,放入到对应Eventloop的任务队列中
    */ 
    if (eventLoop.inEventLoop()) {
        // 执行实际的注册动作
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

在这个方法里面首先会保存Channel对应的EventLoop的引用,然后会判断指当前动作的线程时候是对应的Eventloop绑定的线程,如果是同一个线程,则直接调用register0()执行的注册动作,如果不是,则会把register0()封装成一个task放入到对应的队列中去等待执行,这样可以避免线程安全问题。register0()的实现如下:

private void register0(ChannelPromise promise) {
    try {
        // 其他省略代码
        // 执行注册动作
        doRegister();
        registered = true;
        safeSetSuccess(promise);
        // 触发pipeline中的Handler的channelRegister()方法
        pipeline.fireChannelRegistered();
        if (isActive()) {
            // 如果已经激活,则触发pipeline中的Handler的channelActive()方法
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

doRegister()方法才是实际执行注册的地方,其实现在AbstractNioChannel.doRegister()里面,实现如下,

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 把Channel注册到Eventloop.Selector对象上
            // 此时只注册,不监听,传入的感兴趣事件为0,
            // 把自身储存在attachment中
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            // 如果对应的SocketChannel已经被取消,即已经调用了 SelectionKet.cancel()方法,这时候就会发生CancelledKeyException对异常,此时就需要显示调用selectionNow()方法把对应的 SelectionKey移除掉。
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

终于看到了实现注册的代码了,

selectionKey = javaChannel().register(eventLoop().selector, 0, this);

这句代码就是执行实际注册的动作,javaChanenl()方法返回的SelectableChannel对象,对于NioServerSocketChannel对象而言正是在上一步中生成的ServerSocketChannel对象,然后把这个对象注册到EventLoop.selector对象上,每个Eventloop对象都会绑定一个Selector对象。

这里有三点需要注意:

  1. 调用register()方法的时候传入的第二个参数是0,表示只是把ServerSocketChannel对象注册到Selector对象上面,但是不对任何事件感兴趣,也就是不监听任何对象。那么是什么时候注册ACCEPT事件的呢?这个后面会分析到。
  2. 在调用register()的时候把自己,也就是NioServerSocketChanenl对象放到attachment()中保存,以供后面使用。
  3. 如果在调用register()的时候发现对应的SelectionKey已经被取消,这个时候就需要显示调用seelctor.selectNow()方法把对应的selectionKey对象移除掉。

reegister0()中调用完doRegister()执行最终的注册动作之后,接着就会调用pipeline.fireChannelRegistered()方法触发ChannelPipeline里面的ChannelHandlerchannelRegistered()方法的执行。接着会判断当前链路是否激活,如果激活了则会调用pipeline.fireChannelActive()触发执行ChannelHandlerchannelActive()方法的执行。

看到这里大家估计会有点疑惑:NioServerSocketChannel在注册到Eventloop.Selector的时候只注册没监听任何事件,那么Server端是怎么接收到Client发起的链接的呢?

别慌,接着往下分析。

这个时候再回到AbstractBootstrap.doBind()方法中

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and succesful.
            ChannelPromise promise = channel.newPromise();
            // 执行绑定动作
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            // 其他省略代码
            return promise;
        }
    }

上面分析的都是调用initAndRegister()方法引起的调用链,在这个方法之后,已经生成了一个NioServerSocketChannel对象并且绑定到了Eventloop.Selector对象上。

接着继续分析。接下来不出意外的话会继续执行doBind()这个方法,doBind()方法实际上调用的是Channel.bind()方法,继续跟踪到AbstractChannel.bind() 方法中有

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

实际上执行的还是ChannelPipeline.bind()方法,继续跟踪到DefaultPipeline.HeadContext.bind()方法有

@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}

可以很明显的看到执行的是Unsafe.bind()方法,接着跟踪到AbstractChannel.AbstractUnsafe.bind()方法

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    // 其他省略代码
    boolean wasActive = isActive();
    try {   
        //实际绑定操作
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        // 调用doBind()之前处于非激活状态,调用doBind()之后处于激活状态就会触发执行ChannelHandler的channelActive()方法
        invokeLater(new OneTimeTask() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

在这个方法中会调用doBind()方法执行实际的绑定操作,对于NioServerSocketChannel而言是

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().socket().bind(localAddress, config.getBacklog());
}

很明显是把ServerSocket对象绑定到本地端口。
接着,会判断当前链路状态,如果在调用doBind()之前处于非激活状态,在调用doBind()之后处于激活状态,则会调用pipeline.fireChannelActive();方法。对于此一次执行bind()的时候符合这个条件,接下来会执行pipeline.fireChannelActive();这个方法。继续跟踪有:

@Override
public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();

    if (channel.config().isAutoRead()) {
        // 如果设置了 autoRead属性,则会调用channel.read()方法
        channel.read();
    }

    return this;
}

可以看到在这个方法中会先依次调用ChannelPipeline中自定义的ChannelHandlerchannelActive()方法,接下来会判断Channel有没有设置autoRead属性,如果设置了这个属性,则会调用channel.read()方法。

可以看下isAutoRead()这个方法:

/**
 * Returns {@code true} if and only if {@link ChannelHandlerContext#read()} will be invoked automatically so that
 * a user application doesn't need to call it at all. The default value is {@code true}.
 */
boolean isAutoRead();

可以看到这个属性默认是开启的,如果人为的设置了false,就需要用户代码显示调用channel.read()这个方法。

一般情况下都不会修改这个属性,因此这个时候会执行channel.read()这个方法,跟踪到AbstractChannel.red()中有

@Override
public Channel read() {
    pipeline.read();
    return this;
}

继续跟踪到DefaultChannelPipeline.HeadContext.read()中有

@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}

调用的实际上是Unsafe.beginRead()方法,接着跟踪有:

@Override
public final void beginRead() {
    if (!isActive()) {
        return;
    }

    try {
        // 开始读取数据
        doBeginRead();
    } catch (final Exception e) {
        // 异常处理代码
    }
}

继续跟踪doBeginRead()方法有

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    if (inputShutdown) {
        return;
    }

    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    // 让SelectionKey对readInterestOp事件感兴趣,即监听readInterestOp事件
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

关键代码是下面两行

if ((interestOps & readInterestOp) == 0) {
    selectionKey.interestOps(interestOps | readInterestOp);
}

这两行代码会把readInterestOp指代的事件注册到SelectionKey上面去。而对于NioServerSocketChannel,这个是什么呢?

是在NioServerSocketChannel的构造函数里面传过来的:

/**
 * Create a new instance using the given {@link ServerSocketChannel}.
 */
public NioServerSocketChannel(ServerSocketChannel channel) {
    // 调用父类的构造函数
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

至此,关于NioServerSocketChannel,整个过程已经明朗起来,整个过程时序图如下:

Server端已经启动,线程一直会监听OP_ACCEPT事件,只要有客户端发起链接,服务端即会收到请求,进行下一步的处理

Client端:Bootstrap

Bootstrap的用法为

private  Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.SO_KEEPALIVE,true)
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
            }
        });
ChannelFuture future = bootstrap.connect(server,port).sync();

Bootstrap的入口点在connect()方法,跟踪进去有

/**
 * Connect a {@link Channel} to the remote peer.
 */
public ChannelFuture connect(SocketAddress remoteAddress) {
    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }

    validate();
    return doConnect(remoteAddress, localAddress());
}

实际上调用的是doConnect()方法

 private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        // 同样调用initRegister()初始化Channel和绑定到Eventloop上面去
        final ChannelFuture regFuture = initAndRegister();
        // 其他省略代码 
        if (regFuture.isDone()) {
            doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                }
            });
        }

        return promise;
    }

在这个函数里面也同样会调用initAndRegister()生成和绑定Channel对象。

继续跟踪会发现最后调用的是DefaultChannelPipeline.connect():

@Override
public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

可以明显的看到调用的是Unsafe.connect()方法,

AbstractNioChannel.AbstractUnsafe.connect方法

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    try {
        if (connectPromise != null) {
            throw new IllegalStateException("connection attempt already made");
        }

        boolean wasActive = isActive();
        // 调用doConnect()执行连接服务器动作
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.
            // 根据用户设置的连接超时时间,启动连接超时检测定时器
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                // 新建一个定时任务检测超时情况
                connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        // 定时器超时,说明客户端连接超时,关闭客户端,释放资源
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }

            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

在这里实际上调用的是doConnect()方法执行连接服务器的操作,改方法定义如下:

 @Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        // 如果传入了本地绑定地址,就先绑定到本地地址
        javaChannel().socket().bind(localAddress);
    }

    boolean success = false;
    try {
        // 调用SocketChannel.connect()连接远程服务器
        boolean connected = javaChannel().connect(remoteAddress);
        if (!connected) {
            // 如果连接失败,则让对应的SelectionKey对象监听OP_CONNECT事件
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        // 只要没有发生异常,success即为true
        success = true;
        return connected;
    } finally {
        if (!success) {
            // 如果发生异常则关闭
            doClose();
        }
    }
}

在这个方法中,会调用SocketChannel.connect()方法连接服务器,这个方法会返回三种结果

  1. 连接成功,返回true
  2. 发生异常,连接失败,这个时候会抛出异常。
  3. 连接中,还么有返回连接结果,即三次握手中,返回connectedfalse。对于这一种情况,会设置对应的SelectionKey对象监听OP_CONNECT事件。

在回到connect()方法中,对于第一种情况,doConnect()连接成功返回true,接下来会执行fulfillConnectPromise()方法,定义如下:

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
    if (promise == null) {
        // Closed via cancellation and the promise has been notified already.
        return;
    }

    // trySuccess() will return false if a user cancelled the connection attempt.
    boolean promiseSet = promise.trySuccess();

    // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
    // because what happened is what happened.
    if (!wasActive && isActive()) {
        // 调用fireChannelActive()
        pipeline().fireChannelActive();
    }

    // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
    if (!promiseSet) {
        close(voidPromise());
    }
}

在这个方法中同样会调用pipeline().fireChannelActive(),然后会触发让SocketChannel监听OP_READ事件,OP_READ事件是在AbstractNioByteChannel的构造函数中传入的

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

对于第三种超时的情况,会根据设置的连接超时时间启动连接超时检测定时器,实现代码为

ClientServer交互

NioServerSocketChannel启动后,其所在的线程一直在监听OP_ACCEPT事件,这个时候Client发起请求,

2017-05-30 14:0749