跟我一起开发商业级IM(3)—— 长连接稳定性之连接及重连

4,207 阅读11分钟

欢迎转载,转载请注明出处:juejin.cn/post/686893…

写在前面

贴个Kula高清图镇楼:
Kula

在之前的跟我一起开发商业级IM(1)—— 技术选型及协议定义跟我一起开发商业级IM(2)—— 接口定义及封装两篇文章,我们已经了解IMS的技术选型及接口定义与封装,接下来,我们来真正实现连接及重连部分。

一个社交产品,长连接稳定是前提,绝大部分业务逻辑的正常运行都需要稳定的长连接支撑,可谓重中之重。本篇文章将会讲述如何去实现并维护一个稳定的长连接,以及各种异常情况的处理等。阅读完本篇文章,你将会学到连接、重连机制、心跳机制等知识。同时,会在Github上开源相关代码(包含Android客户端/Java服务端、基于TCP/WebSocket),废话不说,我们开始吧。

初始化配置

初始化配置,也就是在应用程序启动并进行IMS初始化时,传入所需配置参数,可根据自己的业务需求自定义。下面我们来看看NettyTCPIMS初始化接口的代码实现(由于基于NettyWebSocket实现的NettyWebSocketIMS大部分代码及逻辑都与NettyTCPIMS相同,就不单独贴出NettyWebSocketIMS代码了,下面只会讲解WebSocket对比TCP实现所不同的地方,有需要完整代码的话可以跳转Github查看):

 /**
  * 初始化
  * @param context
  * @param options               IMS初始化配置
  * @param connectStatusListener IMS连接状态监听
  * @param msgReceivedListener   IMS消息接收监听
  * @return
  */
 @Override
 public boolean init(Context context, IMSOptions options, IMSConnectStatusListener connectStatusListener, IMSMsgReceivedListener msgReceivedListener) {
    if (context == null) {
        Log.d(TAG, "初始化失败:Context is null.");
        initialized = false;
        return false;
    }

    if (options == null) {
        Log.d(TAG, "初始化失败:IMSOptions is null.");
        initialized = false;
        return false;
    }
    this.mContext = context;
    this.mIMSOptions = options;
    this.mIMSConnectStatusListener = connectStatusListener;
    this.mIMSMsgReceivedListener = msgReceivedListener;
    executors = new ExecutorServiceFactory();
    // 初始化重连线程池
    executors.initBossLoopGroup();
    // 注册网络连接状态监听
    NetworkManager.getInstance().registerObserver(context, this);
    // 标识ims初始化成功
    initialized = true;
    // 标识ims已打开
    isClosed = false;
    callbackIMSConnectStatus(IMSConnectStatus.Unconnected);
    return true;
}

如上图,简单讲讲初始化的几个步骤:

  1. 参数
  • context 应用程序上下文,方便IMS获取系统资源并进行一些系统操作等。
  • options IMS初始化所需配置,其中定义了通信实现方式、通信协议、传输协议、连接超时时间、重连延时时间、重连次数、心跳前后台间隔时间、服务器地址等一些支持自定义的参数。
  • connectStatusListener IMS连接状态回调,便于把连接状态反馈到应用层。
  • msgReceivedListener 消息接收回调,便于IMS把接收到的消息回调到应用层(本篇文章主要讲解连接及重连,所以不涉及消息部分,后续会详细讲解)。
  1. 创建线程池组
    线程池组分为boss线程池和work线程池,其中boss线程池负责连接及重连部分;work线程池负责心跳部分,均为单线程线程池(因为同时只能有一个线程进行连接或心跳)。至于为什么用线程池,纯属个人习惯,大家也可以分别用一个子线程实现即可。
  2. 注册网络状态监听
    网络变化时,进行IMS重连。

初始Bootstrap

初始化Bootstrap,可参考Netty ChannelOption并根据实际业务场景进行定制,下面贴出我自己定制的配置:

/**
 * 初始化bootstrap
 */
void initBootstrap() {
    closeBootstrap();// 初始化前先关闭
    NioEventLoopGroup loopGroup = new NioEventLoopGroup(4);
    bootstrap = new Bootstrap();
    bootstrap.group(loopGroup).channel(NioSocketChannel.class)
            // 设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
            .option(ChannelOption.SO_KEEPALIVE, true)
            // 设置禁用nagle算法,如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。默认为false
            .option(ChannelOption.TCP_NODELAY, true)
            // 设置TCP发送缓冲区大小(字节数)
            .option(ChannelOption.SO_SNDBUF, 32 * 1024)
            // 设置TCP接收缓冲区大小(字节数)
            .option(ChannelOption.SO_RCVBUF, 32 * 1024)
            // 设置连接超时时长,单位:毫秒
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, mIMSOptions.getConnectTimeout())
            // 设置初始化ChannelHandler
            .handler(new NettyTCPChannelInitializerHandler(this));
}

至于参数的含义,大家可参照官方文档的介绍。

连接

连接也可以认为是重连,执行重连响应逻辑即可:

/**
 * 连接
 */
@Override
public void connect() {
    if(!initialized) {
        Log.w(TAG, "IMS初始化失败,请查看日志");
        return;
    }
    isExecConnect = true;// 标识已执行过连接
    this.reconnect(true);
}

所以我们直接看重连部分,也是整篇文章中最核心最复杂的部分

重连

因为连接及重连部分代码较多及逻辑较复杂,为了使NettyTCPIMS代码尽量简洁及逻辑清晰,所以将连接及重连部分代码抽取到NettyTCPReconnectTask

public class NettyTCPReconnectTask implements Runnable {

    private static final String TAG = NettyTCPReconnectTask.class.getSimpleName();
    private NettyTCPIMS ims;
    private IMSOptions mIMSOptions;

    NettyTCPReconnectTask(NettyTCPIMS ims) {
        this.ims = ims;
        this.mIMSOptions = ims.getIMSOptions();
    }

    @Override
    public void run() {
        try {
            // 重连时,释放工作线程组,也就是停止心跳
            ims.getExecutors().destroyWorkLoopGroup();

            // ims未关闭并且网络可用的情况下,才去连接
            while (!ims.isClosed() && ims.isNetworkAvailable()) {
                IMSConnectStatus status;
                if ((status = connect()) == IMSConnectStatus.Connected) {
                    ims.callbackIMSConnectStatus(status);
                    break;// 连接成功,跳出循环
                }

                if (status == IMSConnectStatus.ConnectFailed
                        || status == IMSConnectStatus.ConnectFailed_IMSClosed
                        || status == IMSConnectStatus.ConnectFailed_ServerListEmpty
                        || status == IMSConnectStatus.ConnectFailed_ServerEmpty
                        || status == IMSConnectStatus.ConnectFailed_ServerIllegitimate
                        || status == IMSConnectStatus.ConnectFailed_NetworkUnavailable) {
                    ims.callbackIMSConnectStatus(status);

                    if(ims.isClosed() || !ims.isNetworkAvailable()) {
                        return;
                    }
                    // 一个服务器地址列表都连接失败后,说明网络情况可能很差,延时指定时间(重连间隔时间*2)再去进行下一个服务器地址的连接
                    Log.w(TAG, String.format("一个周期连接失败,等待%1$dms后再次尝试重连", mIMSOptions.getReconnectInterval() * 2));
                    try {
                        Thread.sleep(mIMSOptions.getReconnectInterval() * 2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } finally {
            // 标识重连任务停止
            ims.setReconnecting(false);
        }
    }

    /**
     * 连接服务器
     * @return
     */
    private IMSConnectStatus connect() {
        if (ims.isClosed()) return IMSConnectStatus.ConnectFailed_IMSClosed;
        ims.initBootstrap();
        List<String> serverList = mIMSOptions.getServerList();
        if (serverList == null || serverList.isEmpty()) {
            return IMSConnectStatus.ConnectFailed_ServerListEmpty;
        }

        for (int i = 0; i < serverList.size(); i++) {
            String server = serverList.get(i);
            if (StringUtil.isNullOrEmpty(server)) {
                return IMSConnectStatus.ConnectFailed_ServerEmpty;
            }

            String[] params = null;
            try {
                params = server.split(" ");
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (params == null || params.length < 2) {
                return IMSConnectStatus.ConnectFailed_ServerIllegitimate;
            }

            if(i == 0) {
                ims.callbackIMSConnectStatus(IMSConnectStatus.Connecting);
            }

            // +1是因为首次连接也认为是重连,所以如果重连次数设置为3,则最大连接次数为3+1次
            for (int j = 0; j < mIMSOptions.getReconnectCount() + 1; j++) {
                if (ims.isClosed()) {
                    return IMSConnectStatus.ConnectFailed_IMSClosed;
                }
                if (!ims.isNetworkAvailable()) {
                    return IMSConnectStatus.ConnectFailed_NetworkUnavailable;
                }

                Log.d(TAG, String.format("正在进行【%1$s】的第%2$d次连接", server, j + 1));
                try {
                    String host = params[0];
                    int port = Integer.parseInt(params[1]);
                    Channel channel = toServer(host, port);
                    if (channel != null && channel.isOpen() && channel.isActive() && channel.isRegistered() && channel.isWritable()) {
                        ims.setChannel(channel);
                        return IMSConnectStatus.Connected;
                    } else {
                        if (j == mIMSOptions.getReconnectCount()) {
                            // 如果当前已达到最大重连次数,并且是最后一个服务器地址,则回调连接失败
                            if(i == serverList.size() - 1) {
                                Log.w(TAG, String.format("【%1$s】连接失败", server));
                                return IMSConnectStatus.ConnectFailed;
                            }
                            // 否则,无需回调连接失败,等待一段时间再去进行下一个服务器地址连接即可
                            // 也就是说,当服务器地址列表里的地址都连接失败,才认为是连接失败
                            else {
                                // 一个服务器地址连接失败后,延时指定时间再去进行下一个服务器地址的连接
                                Log.w(TAG, String.format("【%1$s】连接失败,正在等待进行下一个服务器地址的重连,当前重连延时时长:%2$dms", server, mIMSOptions.getReconnectInterval()));
                                Log.w(TAG, "=========================================================================================");
                                Thread.sleep(mIMSOptions.getReconnectInterval());
                            }
                        } else {
                            // 连接失败,则线程休眠(重连间隔时长 / 2 * n) ms
                            int delayTime = mIMSOptions.getReconnectInterval() + mIMSOptions.getReconnectInterval() / 2 * j;
                            Log.w(TAG, String.format("【%1$s】连接失败,正在等待重连,当前重连延时时长:%2$dms", server, delayTime));
                            Thread.sleep(delayTime);
                        }
                    }
                } catch (InterruptedException e) {
                    break;// 线程被中断,则强制关闭
                }
            }
        }

        return IMSConnectStatus.ConnectFailed;
    }

    /**
     * 真正连接服务器的地方
     * @param host
     * @param port
     * @return
     */
    private Channel toServer(String host, int port) {
        Channel channel;
        try {
            channel = ims.getBootstrap().connect(host, port).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
            channel = null;
        }

        return channel;
    }
}

从以上代码,可以看到主要分为三个方法:

  • run() 重连任务是一个Thread,run()方法也就是线程启动时执行的方法,主要是判断IMS是否关闭和网络状态,满足这两个条件就一直循环去连接,连接成功后,回调连接状态并停止线程,否则,一个周期连接失败后(一个连接失败周期,代表从开始连接到所有服务器地址达到最大重连次数),延时一段时间再去尝试重连(大家可能会问为什么要去延时,直接连接不好吗?主要是因为如果连接失败的话,大多数情况下可能是客户端网络环境不好或者是服务端存在问题,延时是为了在下一个时间节点时网络恢复等,避免频繁连接,节约性能),直至连接成功为止。

  • toServer() toServer()主要是Netty框架进行TCP长连接的代码,比较简单。

  • connect() 连接及重连的所有逻辑,都放到connect()方法中进行。TCPWebSocket的方式有细微的区别,下面主要以TCP为例,至于WebSocket的区别,稍后会列出来。
    注:ims_kulaSDK中固定的TCP服务器地址的格式为:IP地址 端口号,例:192.168.0.1 8808,大家也可以根据自己的需求来定义格式。

    connect()方法大体逻辑如下:

    • 判断IMS是否已关闭或网络是否不可用,若满足两个条件的其中之一,即返回连接失败状态;
    • 判断用户是否设置了服务器地址列表,若未设置,即返回连接失败状态;
    • 若以上条件都未满足,也就是IMS未关闭,网络可用,并且服务器地址已设置,则初始化Bootstrap;
    • 接着需要两个for循环,外层循环负责遍历服务器地址列表,取出每一个服务器地址;内层循环负责遍历用户设置的最大重连次数,默认为3次,加上连接所需的一次,也就是说在不设置最大重连次数的情况下,ims_kulaSDK会对每个服务器地址进行4次连接。同时,重连间隔时间为 reconnectInterval + reconnectInterval / 2 * n,也就是如果设置重连间隔时间为8000ms,那么第二次重连间隔时间将为12000ms,第三次为16000ms,以此类推;
    • 获取服务器地址后,对地址进行字符串分割,分别获取host和port;
    • 接着调用Netty连接TCP的方法(toServer(String host, int port))进行连接即可。

注:WebSocket连接方式与TCP大同小异,唯一的区别就是WebSocket的服务器地址格式与TCP不同,ws://IP地址:端口号/websocket,例:ws://192.168.0.1:8808/websocket,所以WebSocket获取host和port代码如下:
(伪代码,具体代码可见NettyWebSocketReconnectTask

URI uri = URI.create(server);
String host = uri.getHost();
int port = uri.getPort();

至于其余连接及重连部分代码,WebSocketTCP是一致的,因为WebSocket本身就是基于TCP协议作一层封装。

何时重连及断开连接

首先明确一下,重连是相对于客户端来说的,服务端不存在主动连接;断开连接是相对于服务端来说的,严格来说是移除响应的Channel。

客户端重连时机:

  • 网络切换
  • 断开网络连接
  • 可感知的服务端异常
  • 心跳超时等

服务端断开连接时机:

  • 可感知的客户端异常
  • 心跳超时
  • 同一IP的客户端重复连接等

不知道大家注意到没有,上述的客户端重连时机和服务端断开连接时机,都分别有一个可感知异常。什么是可感知异常?也就是无论客户端还是服务端,在对方断开连接的时候,可以感知到,就是可感知异常。

经测试,在双方建立连接成功的状态下,对于客户端来说,如果服务端手动停止服务,Netty会回调exceptionCaught()方法,如下:
服务端手动停止服务客户端回调exceptionCaught()方法
服务端直接关机或者拔网线时,客户端无法感知,需要利用心跳超时机制进行重连。

同理,对于服务端来说,如果客户端手动杀死进程,Netty会回调channelInactive()方法,如下:
客户端手动杀死进程服务端回调channelInactive()方法
客户端直接关机或者断网时,服务端无法感知,同样需要利用心跳机制进行断开客户端连接(移除channel)。

注:利用心跳超时机制进行重连及断开连接会在后续文章讲解,本篇文章主要讲解连接及重连,就不在此展开了。

效果展示

考虑到GIF图片体积过大,暂时先把连接超时时间和重连间隔时间适当缩短,下面展示几种情况下的客户端连接变化:

  • 正常连接
    KulaChat正常连接
  • 客户端主动断网重连
    KulaChat客户端主动断网重连
  • 服务端停止服务重连
    KulaChat服务端主动停止服务重连

注:以上GIF图,客户端与服务端建立连接成功时,会显示“消息”字样,否则会显示连接状态。

客户端日志如下:
KulaChat客户端日志

服务端日志如下:
KulaChat服务端日志

由于客户端杀死进程及服务端主动停止服务,日志会清空,所以就不贴更详细的日志了,感兴趣的同学可以pull代码自行验证。

写在最后

通过以上代码实现,如果不考虑长连接稳定性的情况下(未加入心跳超时重连逻辑),已经可以进行客户端与服务端消息的收发,本文主要讲解连接及重连模块,所以暂未加入消息收发功能。

在下一篇文章中,将会讲解TCP/WebSocket的拆包与粘包处理,由于Netty已封装了各种不同的消息编解码器,所以如果使用我定义的消息格式,拆包与粘包的处理将会很简单,直接拿来用即可。考虑到大家可能有不同业务协议的需求,所以会加入自定义协议的消息编解码器的实现,敬请期待。

相关代码已提交Github,需要自取:

PS:新开的公众号不能留言,如果大家有不同的意见或建议,可以到掘金上评论或者加到QQ群:1015178804,如果群满人的话,也可以在公众号给我私信,谢谢。
贴上公众号:
FreddyChen
FreddyChen的微信公众号

下篇文章见,古德拜~ ~ ~