基于FixedChannelPool的socket连接池

2,315 阅读5分钟

背景

因为项目需求,需要集成socket连接,系统作为客户端向外部系统(socket服务端)发起请求连接;本身是比较简单的事情,但是考虑到并发问题,使用了NIO的netty框架以及连接池。

show code

一、pom文件依赖netty

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>

二、继承ChannelPoolHandler的hanlder文件

public class NettyChannelPoolHandler implements ChannelPoolHandler {
    @Override
    public void channelReleased(Channel channel) throws Exception {
        System.out.println("channelReleased. Channel ID: " + channel.id());
    }

    @Override
    public void channelAcquired(Channel channel) throws Exception {
        System.out.println("channelAcquired. Channel ID: " + channel.id());

    }

    @Override
    public void channelCreated(Channel ch) throws Exception {
        System.out.println("channelCreated. Channel ID: " + ch.id());
        SocketChannel channel = (SocketChannel) ch;
        channel.config().setKeepAlive(true);
        channel.config().setTcpNoDelay(true);
        channel.pipeline()
                //心跳支持
                .addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS))
                .addLast(new LengthFieldPrepender(2))
                .addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2))
                .addLast(new ObjectCodec())
                .addLast(new NettyClientHandler("ping-pong"));
    }
}

三、连接池

//客户端连接池
//单例模式,第一次连接时初始化poolMap,poolMap存放着不同地址server端的池
//之后的访问都是从poolMap中取出池或者put池
public class NettyPoolClient {

    private static NettyPoolClient instance;
    private NettyPoolClient() {

    }
    public static NettyPoolClient getInstance() {
        if (instance == null) {
            instance = new NettyPoolClient();
            instance.build();
        }
        return instance;
    }

    public ChannelPoolMap<InetSocketAddress, SimpleChannelPool> poolMap;
    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap strap = new Bootstrap();

    public void build() {
        // ChannelOption.TCP_NODELAY 禁用nagle算法,从而可以发送较小的包,降低延迟
        //ChannelOption.SO_KEEPALIVE 隔一段时间(两小时左右)探测服务端是否活跃,如果没有活跃关闭socket--意义不大
        strap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true);

        poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() {
            @Override
            protected SimpleChannelPool newPool(InetSocketAddress key) {
                //NettyChannelPoolHandler 实现ChannelPoolHandler,重写了创建通道、获得通道、归还通道方法
                return new FixedChannelPool(strap.remoteAddress(key), new NettyChannelPoolHandler(), 2);
            }
        };

    }

}

四、测试类

    //实际上这是个controller,可以通过http的方式进行调用
    public String connectSocketServer(String host,String port) {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(host, Integer.parseInt(port));
        //根据地址获得池时,如果poolMap没有这个池,则会put一个生成新的池
        SimpleChannelPool pool = NettyPoolClient.getInstance().poolMap.get(inetSocketAddress);
        //获得池中的通道,写入参数,归还通道;这里是异步处理
        Future<Channel> f = pool.acquire();
        f.addListener((FutureListener<Channel>) f1 -> {
            if (f1.isSuccess()) {
                Channel ch = f1.getNow();
                ch.writeAndFlush("hello socket");
                pool.release(ch);
            }
        });
        return "success";
    }

五、开始测试

可以看到,这两次请求的通道id是一样的,说明连接池已经起作用了

源码分析

以上的内容已经可以满足需求,但是作为技术人员要有探索的精神,既然我们可以用了,不妨探究一下它是怎么实现的。

流程分析

乍一看,流程非常简单;这是因为netty已经为我们做了大部分事情。仔细想想有很多意思的事情,比如我第一次连接server1时初始化,然后get到一个连接池;第二次连接server2,get到的是什么(是空吗?好像不是)。如果从连接池没有拿到连接怎么办?

FixedChannelPool

我们使用FixedChannelPool这个类创建的池,就从这个类开始顺藤摸瓜。 打开idea,找到位于io.netty.channel.pool包下的这个类,映入眼帘的是

public class FixedChannelPool extends SimpleChannelPool

有没有似曾相识,没错;我们用SimpleChannelPool这个类作为poolMapvalue的泛型;当然也可以FixedChannelPool作为泛型,毕竟他们是父子关系。 既然存在父子关系,那么他们必然也存在相同的特征:

  1. 他们都是
  2. 他们都实现了ChannelPool,这个接口主要有acquire()release(Channel var1)close()这几个方法,是对连接进行操作的;既然他俩都是池,肯定要对连接进行操作。

在java中子类继承父类非私有的属性、方法,我们可以粗略的理解为子类拥有比父类更多的功能,所以让我们看看不同:

SimpleChannelPool只是实现了基本的连接池的功能,FixedChannelPool拥有更加强大的功能,比如对连接池的大小的配置,超时的时间等(这很后浪

将重点放到FixedChannelPool

    //这些都是 FixedChannelPool构造方法,透过现象看本质,我们直接到最后一个入参最多的构造方法
    public FixedChannelPool(Bootstrap bootstrap, ChannelPoolHandler handler, int maxConnections) {
        this(bootstrap, handler, maxConnections, 2147483647);
    }

    public FixedChannelPool(Bootstrap bootstrap, ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
        this(bootstrap, handler, ChannelHealthChecker.ACTIVE, (FixedChannelPool.AcquireTimeoutAction)null, -1L, maxConnections, maxPendingAcquires);
    }

    public FixedChannelPool(Bootstrap bootstrap, ChannelPoolHandler handler, ChannelHealthChecker healthCheck, FixedChannelPool.AcquireTimeoutAction action, long acquireTimeoutMillis, int maxConnections, int maxPendingAcquires) {
        this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true);
    }

    public FixedChannelPool(Bootstrap bootstrap, ChannelPoolHandler handler, ChannelHealthChecker healthCheck, FixedChannelPool.AcquireTimeoutAction action, long acquireTimeoutMillis, int maxConnections, int maxPendingAcquires, boolean releaseHealthCheck) {
        this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, releaseHealthCheck, true);
    }

    //在这里!!
    public FixedChannelPool(Bootstrap bootstrap, ChannelPoolHandler handler, ChannelHealthChecker healthCheck, FixedChannelPool.AcquireTimeoutAction action, long acquireTimeoutMillis, int maxConnections, int maxPendingAcquires, boolean releaseHealthCheck, boolean lastRecentUsed) {
        super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
        this.pendingAcquireQueue = new ArrayDeque();
        if (maxConnections < 1) {
            throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
        } else if (maxPendingAcquires < 1) {
            throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
        } else {
            if (action == null && acquireTimeoutMillis == -1L) {
                this.timeoutTask = null;
                this.acquireTimeoutNanos = -1L;
            } else {
                if (action == null && acquireTimeoutMillis != -1L) {
                    throw new NullPointerException("action");
                }

                if (action != null && acquireTimeoutMillis < 0L) {
                    throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
                }

                this.acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
                switch(action) {
                case FAIL:
                    this.timeoutTask = new FixedChannelPool.TimeoutTask() {
                        public void onTimeout(FixedChannelPool.AcquireTask task) {
                            task.promise.setFailure(FixedChannelPool.TIMEOUT_EXCEPTION);
                        }
                    };
                    break;
                case NEW:
                    this.timeoutTask = new FixedChannelPool.TimeoutTask() {
                        public void onTimeout(FixedChannelPool.AcquireTask task) {
                            task.acquired();
                            FixedChannelPool.super.acquire(task.promise);
                        }
                    };
                    break;
                default:
                    throw new Error();
                }
            }

            this.executor = bootstrap.config().group().next();
            this.maxConnections = maxConnections;
            this.maxPendingAcquires = maxPendingAcquires;
        }
    }

最后一个构造方法的入参

  • FixedChannelPool.AcquireTimeoutAction 这个是个枚举类,NEW--没有可用连接时,超过时长,创建新的;FAIL--没有可用连接时,超过时长,跑出异常
  • acquireTimeoutMillis 连接socket池最大时间
  • maxConnections 最大连接数
  • maxPendingAcquires 这个是指超过最大连接数的 等待数量
  • lastRecentUsed true--从队列尾取连接 false--队列头取连接

ChannelPoolMap

这是一个接口,他的实现是 AbstractChannelPoolMap 这个连接池的map是ConcurrentHashMap,支持并发的map;再看一下我们在用的get()方法,上源码:

    //这次的代码很简单
    public final P get(K key) {
        P pool = (ChannelPool)this.map.get(ObjectUtil.checkNotNull(key, "key"));
        if (pool == null) {
            pool = this.newPool(key);
            P old = (ChannelPool)this.map.putIfAbsent(key, pool);
            if (old != null) {
                pool.close();
                pool = old;
            }
        }

        return pool;
    }

看了这串代码,大家是不是感觉没那么疑惑了;当有新的key来到的时候,如果判断map中是空,会创建新的连接池,然后再走一步判断;最终将连接池返回给我们使用。

总结

没想到这篇博客写了接近两个小时,腰酸脖子疼,哈哈哈。不过我们要有刨根问底的精神,知其然,知其所以然我们要坚信技术可以改变世界!!晚安