Netty连接超时分析

常用用法为

Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT,3000);// 设置连接超时时间
ChannelFuture future = bootstrap.connect(server,port);

connect()最终执行的代码为AbstrcatNioChannel.connect()方法,实现如下:
跟踪下去会进入到doConnect()方法中,有

 @Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }
    try {
        if (connectPromise != null) {
            throw new IllegalStateException("connection attempt already made");
        }
        boolean wasActive = isActive();
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
           // 其他代码
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

这个方法中会调用doConnect()方法执行实际的连接操作

@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        javaChannel().socket().bind(localAddress);
    }
    boolean success = false;
    try {
        boolean connected = javaChannel().connect(remoteAddress);
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

在实现中会调用SocketChannel.connect()方法连接服务器,对于这个方法会有三种结果:

  1. 连接成功且立马返回,success=trueconnected=true
  2. 连接发生异常,success=false,connected=false
  3. 客户端握手信号发出去了,等待服务器返回握手信号,success=true,connected=false

三种情况下都会执行到finally语句块,但是只有第二种情况下success才会为false,这个时候会执行doClose()

@Override
protected void doClose() throws Exception {
    javaChannel().close();
}

关闭套接字。

对于第三种情况,connected=false,这个时候会让把OP_CONNECT事件加入到selectionKey的感兴趣事件中,这样可以在后面接收到连接事件。

对于上面三种情况,也就只有第一种情况下doConnect()这个方法才会返回true
现在回到connect()方法中

对于第三种情况,会执行到catch语句块中

promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();

会设置promise为失败,然后在closeIfClosed()方法中调用close(voidPromise)执行关闭动作。

对于第一种情况,doConnect()返回true,接下来会执行fulfillConnectPromise()方法,在这个方法中会设置promisesuccess以及触发ChannelHandler.channelActive()方法的执行。

对于第二种情况会执行else语句块中的代码

connectPromise = promise;
requestedRemoteAddress = remoteAddress;

// Schedule connect timeout.
// 读取用户设置的timeout,如果没有设置则默认为0
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
    connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
        @Override
        public void run() {
            // 设置promise为失败状态,并调用close()关闭套接字
            ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
            ConnectTimeoutException cause =
                    new ConnectTimeoutException("connection timed out: " + remoteAddress);
            if (connectPromise != null && connectPromise.tryFailure(cause)) {
                close(voidPromise());
            }
        }
    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}

promise.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isCancelled()) {
            if (connectTimeoutFuture != null) {
                connectTimeoutFuture.cancel(false);
            }
            connectPromise = null;
            close(voidPromise());
        }
    }
});

如果用户设置了超时时间,则会生成一个task,超时之后NioEventloop会执行这个task,在task中设置promisefalse且调用close()关闭套接字。

由于第三种情况下设置了SelectionKey监听OP_CONNECT,所以在超时之前会一直监听连接事件,在NioEventLoop.processSelectKey()

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    // See https://github.com/netty/netty/issues/924
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    // 移除掉OP_CONNECT事件
    k.interestOps(ops);

    unsafe.finishConnect();
}

,在这个期间如果收到了连接事件,首先会把OP_CONNECT事件从SelectionKey中移除掉,接着调用finishConnec()完成连接。代码如下:

@Override
public final void finishConnect() {
    // Note this method is invoked by the event loop only if the connection attempt was
    // neither cancelled nor timed out.

    assert eventLoop().inEventLoop();

    try {
        boolean wasActive = isActive();
        doFinishConnect();
        fulfillConnectPromise(connectPromise, wasActive);
    } catch (Throwable t) {
        fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
    } finally {
        // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
        // See https://github.com/netty/netty/issues/1770
        if (connectTimeoutFuture != null) {
            connectTimeoutFuture.cancel(false);
        }
        connectPromise = null;
    }
}

有一点需要注意的是接收到OP_CONNECT事件并不一定代表连接成功,当连接过程中发生错误也会触发这个事件

/**
 * Operation-set bit for socket-connect operations.  </p>
 *
 * <p> Suppose that a selection key's interest set contains
 * <tt>OP_CONNECT</tt> at the start of a <a
 * href="Selector.html#selop">selection operation</a>.  If the selector
 * detects that the corresponding socket channel is ready to complete its
 * connection sequence, or has an error pending, then it will add
 * <tt>OP_CONNECT</tt> to the key's ready set and add the key to its
 * selected-key&nbsp;set.  </p>
 */
 public static final int OP_CONNECT = 1 << 3;

在这个方法中会先调用doFinishConnect()方法,该方法实现为

@Override
protected void doFinishConnect() throws Exception {
    if (!javaChannel().finishConnect()) {
        throw new Error();
    }
}

这个方法的实现是调用SocketChanenl.finishedConnect()来判断连接有没完成,对于finishedConnect()方法,当且仅当连接成功才会返回true,也有可能抛出异常。

前面说了,接收到OP_CONNECT事件要么表示连接成功,要么表示连接过程张发生异常,所以这个时候调用finishedConnect()也分两种情况,

  1. 连接成功返回true
  2. 连接过程中发生异常,这个时候finishedConnect()会抛出对应的异常。

对应于第一种情况,一切正常,接下来同样会调用fulfillConnectPromise()完成连接过程。

对于第二种情况会执行catch语句块,调用fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));设置promise为失败状态并调用close()关闭底层套接字。

这两种情况都会执行finally语句块

// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
    connectTimeoutFuture.cancel(false);
}
connectPromise = null;

finally语句块中会关闭超时任务。

如果没有设置超时时间会怎么样?

如果没有设置超时时间的话:

  1. connect()中不会设置超时任务。
  2. 对于上面提到的第三种情况,底层套接字有个默认的超时时间(一般是75s到几分钟之间),如果超过这个默认时间服务器都没有返回,则会连接失败,会接受到OP_CONNECT事件。

另外一种方式是调用ChannelFuturte.sync()方法等待connect()操作完成。

@Override
public Promise<V> sync() throws InterruptedException {
    await();
    rethrowIfFailed();
    return this;
}

await()实现如下

@Override
public Promise<V> await() throws InterruptedException {
    // 其他省略代码

    synchronized (this) {
        while (!isDone()) {
            checkDeadLock();
            incWaiters();
            try {
                wait();
            } finally {
                decWaiters();
            }
        }
    }
    return this;
}

这个方法中会先调用incWaiters()方法增加waiters的个数,表示有新的用户在等待完成,接着会调用wait()方法进入睡眠,也就是是说sync()会让调用这个方法的线程进入睡眠,知道有其他线程调用notify()或者notifyAll()唤醒。

那么这个线程会在什么时候被唤醒呢?

前面说到在调用doConnect()链接成功或者链超时之后或者接收到OP_CONNECT事件之后都会调用ChannelPromise.tryFailuer()或者ChannelPromise.trySuccess()方法。

@Override
public boolean trySuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return true;
    }
    return false;
}

setSuccess0()实现如下

private boolean setSuccess0(V result) {
    if (isDone()) {
        return false;
    }
    synchronized (this) {
        // Allow only once.
        if (isDone()) {
            return false;
        }
        if (result == null) {
            this.result = SUCCESS;
        } else {
            this.result = result;
        }
        if (hasWaiters()) {
            notifyAll();
        }
    }
    return true;
}

在这个方法中会判断还有没有waiter,在sync()中每次调用wait()休眠前都会增加waiter,如果还有的话,就会调用notifyAll()唤醒所有等待的线程。

同样的,tryFailuer()也是一样的操作,

private boolean setFailure0(Throwable cause) {
    if (cause == null) {
        throw new NullPointerException("cause");
    }
    if (isDone()) {
        return false;
    }
    synchronized (this) {
        // Allow only once.
        if (isDone()) {
            return false;
        }
        result = new CauseHolder(cause);
        if (hasWaiters()) {
            notifyAll();
        }
    }
    return true;
}
2017-05-17 21:4950