在网络通信的时候,一般来说客户端和服务端只需要建立一个连接就够了,但是在某些场景下我们需要建立多个连接。比如使用了负载均衡,如果只建立一个连接,可能会出现负载不均衡的场景,有时候我们为了增加客户端的吞吐量也需要建立连接池。
创建连接池的最大难点就在于如何保证在高并发的情况下,能够创建我们指定的连接数,以及如何做好连接池的管理,比如连接池无可用连接怎么办?连接假死后如何为连接池补充新的连接。Netty为我们提供了两个连接池实现这些功能。SimpleChannelPool封装了连接池的基本功能,但是它不能指定连接池的连接数,所以不能被应用到生产。FixedChannelPool是功能更加强大的连接池,它扩展了SimpleChannelPool可以被应用到生产中。
Netty的连接池最简单的使用姿势
public class ClientMock {
private static SimpleChannelPoolMap poolMap;
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup(1, new DefaultThreadFactory("Client-Event", false));
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
poolMap = new SimpleChannelPoolMap(bootstrap);
SimpleChannelPool channelPool = poolMap.get(new InetSocketAddress(8090));
// 从连接池获取一个连接
channelPool.acquire().addListener(new FutureListener<Channel>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
Channel channel = future.getNow();
channel.writeAndFlush(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8));
// 将连接放入连接池
channelPool.release(channel);
}
if (future.cause() != null) {
System.out.println(future.cause());
}
}
});
}
}
public class SimpleChannelPoolMap extends AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool> {
private Bootstrap bootstrap;
SimpleHandler simpleHandler = new SimpleHandler();
public SimpleChannelPoolMap(Bootstrap bootstrap) {
this.bootstrap = bootstrap;
}
@Override
protected SimpleChannelPool newPool(InetSocketAddress key) {
return new SimpleChannelPool(bootstrap.remoteAddress(key), new ChannelPoolHandler() {
@Override
public void channelReleased(Channel ch) throws Exception {
System.out.println("channelReleased: " + ch);
}
@Override
public void channelAcquired(Channel ch) throws Exception {
System.out.println("channelAcquired: " + ch);
}
@Override
public void channelCreated(Channel ch) throws Exception {
// 为channel添加handler
ch.pipeline().addLast(simpleHandler);
}
});
}
}
很简单就能构建出一个netty的连接池。
SimpleChannelPool
问题一:如何保证一个key只创建一个连接池?
1.AbstractChannelPoolMap
在get连接池的时候通过ConcurrentHashMap的putIfAbsent
保证只创建一个连接池。
private final ConcurrentMap<K, P> map = PlatformDependent.newConcurrentHashMap();
@Override
public final P get(K key) {
P pool = map.get(checkNotNull(key, "key"));
if (pool == null) {
// 创建连接池
pool = newPool(key);
// 如果已经创建了连接池,那么就把新的关闭,然后返回老的
P old = map.putIfAbsent(key, pool);
if (old != null) {
// We need to destroy the newly created pool as we not use it.
poolCloseAsyncIfSupported(pool);
pool = old;
}
}
return pool;
}
问题二:如何创建连接?
SimpleChannelPool#acquire()
public class SimpleChannelPool implements ChannelPool {
private static final AttributeKey<SimpleChannelPool> POOL_KEY =
AttributeKey.newInstance("io.netty.channel.pool.SimpleChannelPool");
private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
boolean releaseHealthCheck, boolean lastRecentUsed) {
this.handler = checkNotNull(handler, "handler");
this.healthCheck = checkNotNull(healthCheck, "healthCheck");
this.releaseHealthCheck = releaseHealthCheck;
// Clone the original Bootstrap as we want to set our own handler
this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
this.bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
assert ch.eventLoop().inEventLoop();
handler.channelCreated(ch);
}
});
this.lastRecentUsed = lastRecentUsed;
}
@Override
public final Future<Channel> acquire() {
return acquire(bootstrap.config().group().next().<Channel>newPromise());
}
@Override
public Future<Channel> acquire(final Promise<Channel> promise) {
return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
}
- 连接池用来保存连接的数据结构是一个线程安全的双端队列。
releaseHealthCheck
表示在获取连接或者释放连接的时候,是否对连接进行健康检查。lastRecentUsed
如果为true,表示获取连接时候从队列尾部获取。为false的时候从队列头部获取。建议使用FIFO,否则可能会导致一直获取一个连接。
创建连接池的时候会调用ChannelPoolHandler#channelCreated方法做对channel的初始化操作。
这里的断言真的是非常细节,调用initChannel
方法的时候,channel的EventLoop已经初始化了,所以这里进行了一次断言。
接下来看下真正获取连接方法acquire
@Override
public final Future<Channel> acquire() {
// 获取线程选择器,并创建一个Promise
return acquire(bootstrap.config().group().next().<Channel>newPromise());
}
@Override
public Future<Channel> acquire(final Promise<Channel> promise) {
return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
}
private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
try {
final Channel ch = pollChannel();
if (ch == null) {
// No Channel left in the pool bootstrap a new Channel
Bootstrap bs = bootstrap.clone();
bs.attr(POOL_KEY, this);
//创建连接
ChannelFuture f = connectChannel(bs);
if (f.isDone()) {
notifyConnect(f, promise);
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notifyConnect(future, promise);
}
});
}
return promise;
}
EventLoop loop = ch.eventLoop();
if (loop.inEventLoop()) {
doHealthCheck(ch, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
doHealthCheck(ch, promise);
}
});
}
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
获取连接的方法是一个完全的异步编程,如果你不理解Netty的源码和EevntLoop的原理这里确实很难理解。
public void promiseTest() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
EventExecutor executorA = new DefaultEventExecutor(new DefaultThreadFactory("EventA"));
EventExecutor executorB = new DefaultEventExecutor();
Channel channel = new NioSocketChannel();
// 为EventLoop注册一个Promise
Promise<Channel> newPromise = executorA.<Channel>newPromise();
System.out.println(Thread.currentThread().getName());
newPromise.addListener(f -> {
if (f.isSuccess()) {
Assert.assertEquals(channel, f.getNow());
System.out.println(Thread.currentThread().getName());
latch.countDown();
}
});
Assert.assertEquals(false, executorB.inEventLoop());
executorB.execute(new Runnable() {
@Override
public void run() {
newPromise.setSuccess(channel);
}
});
latch.await();
}
这段代码可以帮助理解Netty的异步编程方式。然后我们再看SimpleChannelPool的acquire方法。
首先通过Netty客户端BootStrap的EventLoop线程选择器获取一个EventLoop,并创建了一个Promise。然后这个EventLoop将用来执行Channel的连接操作,当channel获取成功或者失败的时候都会通知到这个Promise。
然后根据lastRecentUsed
的值,判断是使用LIFO还是FIFO的方式获取一个连接。
protected Channel pollChannel() {
return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
}
接下来如果没有获取到连接,就会执行建立连接的方法,如果从连接池获取到连接,会对这个连接进行健康检查。
ChannelFuture f = connectChannel(bs);
connectChannel
是真正建立连接的方法,这个方法和调用BootStrap#connect()走的是一样的逻辑,主要就创建channel并为这个channel绑定EventLoop,并把建立连接的操作提交到EventLoop的taskQueue。
private void notifyConnect(ChannelFuture future, Promise<Channel> promise) throws Exception {
//执行成功
if (future.isSuccess()) {
Channel channel = future.channel();
handler.channelAcquired(channel);
// 回写结果
if (!promise.trySuccess(channel)) {
// Promise was completed in the meantime (like cancelled), just release the channel again
release(channel);
}
} else {
promise.tryFailure(future.cause());
}
}
如果连接建立成功,通过future获取到channel,并执行ChannelPoolHandler#channelAcquired的方法,并调用Promise的trySuccess方法,尝试把channel设置到promise的结果。
⚠️这里有个非常重要的信息,就是acquire方法,在建立连接并写入结果到promise后并没有把连接放到连接池。而是写入promise失败才会把这个连接放到连接池里。
如果pollChannel()
获取连接不为空,则会对这个连接进行健康检查。
如果这个连接处于活跃状态,那么执行ChannelPoolHandler#channelAcquired,并把channel写入到promise。
如果这个连接处于非活跃状态,则会关闭这个连接。并重新执行acquireHealthyFromPoolOrNew
方法,从连接池中获取一个新的连接,
SimpleChannelPool#relesae()
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
checkNotNull(channel, "channel");
checkNotNull(promise, "promise");
try {
EventLoop loop = channel.eventLoop();
// 判断是不是eventloop线程
if (loop.inEventLoop()) {
doReleaseChannel(channel, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
doReleaseChannel(channel, promise);
}
});
}
} catch (Throwable cause) {
closeAndFail(channel, cause, promise);
}
return promise;
}
private void doReleaseChannel(Channel channel, Promise<Void> promise) {
assert channel.eventLoop().inEventLoop();
// Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
if (channel.attr(POOL_KEY).getAndSet(null) != this) {
closeAndFail(channel,
// Better include a stacktrace here as this is an user error.
new IllegalArgumentException(
"Channel " + channel + " was not acquired from this ChannelPool"),
promise);
} else {
try {
if (releaseHealthCheck) {
doHealthCheckOnRelease(channel, promise);
} else {
releaseAndOffer(channel, promise);
}
} catch (Throwable cause) {
closeAndFail(channel, cause, promise);
}
}
}
private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
final Future<Boolean> f = healthCheck.isHealthy(channel);
if (f.isDone()) {
releaseAndOfferIfHealthy(channel, promise, f);
} else {
f.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
releaseAndOfferIfHealthy(channel, promise, f);
}
});
}
}
release方法先判断当前线程是不是channel的eventloop线程,如果不是则把放入连接池的任务提交到eventloop线程执行。
在将连接放入连接池的时候会根据releaseHealthCheck
判断,是否对连接进行健康检查。
- 健康检查开启:连接处于活跃状态那么就把连接放到连接池里,如果放入成功则执行ChannelPoolHandler#channelRelease方法,否则的话就会把这个连接关闭,并把结果通知Promise。如果连接没有处于活跃状态,只执行ChannelPoolHandler#channelRelease,并把结果通知Promise。
- 健康检查关闭:直接将连接放到连接池,如果失败,则关闭这个连接。
SimpleChannelPool实现了连接池的基本功能,但是他不能支持限制连接池的连接数,所以在生产环境我们需要使用FixedChannelPool
FixedChannelPool
public FixedChannelPool(Bootstrap bootstrap,
ChannelPoolHandler handler,
ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
final long acquireTimeoutMillis,
int maxConnections, int maxPendingAcquires,
boolean releaseHealthCheck, boolean lastRecentUsed) {
super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
if (maxConnections < 1) {
throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
}
if (maxPendingAcquires < 1) {
throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
}
if (action == null && acquireTimeoutMillis == -1) {
timeoutTask = null;
acquireTimeoutNanos = -1;
} else if (action == null && acquireTimeoutMillis != -1) {
throw new NullPointerException("action");
} else if (action != null && acquireTimeoutMillis < 0) {
throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
} else {
acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
switch (action) {
case FAIL:
timeoutTask = new TimeoutTask() {
@Override
public void onTimeout(AcquireTask task) {
// Fail the promise as we timed out.
task.promise.setFailure(new TimeoutException(
"Acquire operation took longer then configured maximum time") {
@Override
public Throwable fillInStackTrace() {
return this;
}
});
}
};
break;
case NEW:
timeoutTask = new TimeoutTask() {
@Override
public void onTimeout(AcquireTask task) {
// Increment the acquire count and delegate to super to actually acquire a Channel which will
// create a new connection.
task.acquired();
FixedChannelPool.super.acquire(task.promise);
}
};
break;
default:
throw new Error();
}
}
executor = bootstrap.config().group().next();
this.maxConnections = maxConnections;
this.maxPendingAcquires = maxPendingAcquires;
}
maxConnections
连接池中的最大连接数。acquireTimeoutNanos
等待连接池连接的最大时间,单位毫秒。maxPendingAcquires
在请求获取/建立连接大于maxConnections数时,创建等待建立连接的最大定时任务数量。例如maxConnections=2,此时已经建立了2连接,但是没有放入到连接池中,接下来的请求就会放入到一个后台执行的定时任务中,如果到了时间连接池中还没有连接,就可以建立不大于maxPendingAcquires
的连接数,如果连接池中有连接了就从连接池中获取。executor
用于执行获取连接和释放连接的EventLoop。TimeoutTask.FAIL
:如果连接池中没有可用连接了,等待acquireTimeoutNanos
后,抛出一个超时异常。TimeoutTask.NEW
:如果连接池中没有可用连接了,等待acquireTimeoutNanos
后,创建一个新的连接。
FixedChannelPool#acquire
@Override
public Future<Channel> acquire(final Promise<Channel> promise) {
// 使用同一个executor保证线程安全
try {
if (executor.inEventLoop()) {
acquire0(promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
acquire0(promise);
}
});
}
} catch (Throwable cause) {
promise.setFailure(cause);
}
return promise;
}
private void acquire0(final Promise<Channel> promise) {
assert executor.inEventLoop();
if (closed) {
promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
return;
}
if (acquiredChannelCount.get() < maxConnections) {
assert acquiredChannelCount.get() >= 0;
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
// EventLoop
// 创建一个新的Promise
Promise<Channel> p = executor.newPromise();
AcquireListener l = new AcquireListener(promise);
l.acquired();
p.addListener(l);
super.acquire(p);
} else {
if (pendingAcquireCount >= maxPendingAcquires) {
tooManyOutstanding(promise);
} else {
AcquireTask task = new AcquireTask(promise);
if (pendingAcquireQueue.offer(task)) {
++pendingAcquireCount;
if (timeoutTask != null) {
task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
}
} else {
tooManyOutstanding(promise);
}
}
assert pendingAcquireCount > 0;
}
}
FixedChannelPool重写了SimpleChannelPool的acquire(final Promise<Channel> promise)
方法,把所有的获取连接的任务acquire0
交给一个EventLoop执行,让获取我而不是像SimpleChannelPool中每获取一个连接都使用线程选择器选择一个EventLoop,用此来保证在高并发的情况下acquiredChannelCountd < maxConnections
的安全性,可以创建预期的连接数。所以在acquire0
中需要为FixedChannelPool中的EventLoop创建一个新的promise,然后调用SimpleChannelPool的acquire(final Promise<Channel> promise)
方法用来建立一个新的连接或者从连接池中获取一个连接。
每创建一个连接并且这个连接没有放入到连接池时acquiredChannelCount
都会增加加1,用来保证不创建超过maxConnections
的连接数。acquiredChannelCountd > maxConnections
的时候,FixedChannelPool会根据pendingAcquireCount
的值来判断是否创建一个定时的任务,去建立新的连接。
private abstract class TimeoutTask implements Runnable {
@Override
public final void run() {
assert executor.inEventLoop();
long nanoTime = System.nanoTime();
for (;;) {
AcquireTask task = pendingAcquireQueue.peek();
// 检查是否到了执行时间
if (task == null || nanoTime - task.expireNanoTime < 0) {
break;
}
pendingAcquireQueue.remove();
--pendingAcquireCount;
onTimeout(task);
}
}
public abstract void onTimeout(AcquireTask task);
}
FixedChannelPool#release
释放连接的时候同样使用FixedChannelPool中的EventLoop保证高并发下的线程安全问题,在释放连接的时候会执行decrementAndRunTaskQueue()
方法,成功后会尝试终止定时任务,从连接池中返回连接到promise。
private void decrementAndRunTaskQueue() {
// We should never have a negative value.
int currentCount = acquiredChannelCount.decrementAndGet();
assert currentCount >= 0;
runTaskQueue();
}
private void runTaskQueue() {
while (acquiredChannelCount.get() < maxConnections) {
AcquireTask task = pendingAcquireQueue.poll();
if (task == null) {
break;
}
// Cancel the timeout if one was scheduled
ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
--pendingAcquireCount;
task.acquired();
super.acquire(task.promise);
}
// We should never have a negative value.
assert pendingAcquireCount >= 0;
assert acquiredChannelCount.get() >= 0;
}
最重要的是理解Netty连接池的使用方式和工作方式,以及Netty连接池是如何解决高并发下创建指定的连接数问题。下篇文章我会分析一下Sofa-bolt中的连接池和Netty连接池实现的不同之处。