Swoole - webSocket客服IM消息系统方案实践篇

2,004 阅读3分钟

概述

基于Swoole的websocket服务,再之前的消息系统系列的第4篇,实现了更加复杂的业务场景,是对消息推送的完善和优化,代码本身就是不断自我优化的过程。

实现方案

技术的实现方案点主要PMQ,2组客户端(用户端、客服管理端),3个主要的部分组成(Push推送消息+Pull拉取未读消息+MessageQueue消息队列),具体流程和交互方式见上面的架构流程图。

请在此添加图片描述

1.建立链接,借鉴Tcp3次握手的原理,将每一次的用户询问新增一个关系,询问结束时再将关系释放,因为每次随机分配的客服是不一致的,客服管理员控制台,进入控制台会触发检测客服映射关系的程序,以保证关系的唯一性。

2.客服分配:客服分配是根据用户是否为第一次进入链接进行判定依据,首次会随机分发配给在线客服中的其中一个,如果之前分配过的客服也在线,优化分配存在客服,这样处理的原因是客服不易变,用户异变,防止反复链接/断开操作,减少网络开销。

3.并发锁:相同用户在同一时间有3s的锁定状态,用来防止关系错乱,在客户端发来请求时优先获取缓存,近少可能的访问数据库,提高服务的稳定性和性能。

//设置分布式锁,3s之内只能请求一次
$lock = RedisPool::invoke(function (Redis $redis) use ($toUid) {
    return $redis->get(Category::$openLock . $toUid);
}, self::REDIS_CONN_NAME);


if ($lock) {
    $msgErrorRet['code'] = 416;
    $msgErrorRet['msg'] = 'Please try again';
    return $this->response()->setMessage(json_encode($msgErrorRet));
}

//查询是否存在链接关系
$imUserRelation = RedisPool::invoke(function (Redis $redis) use ($toUid) {
    $redis->setEx(Category::$openLockPrefix . $toUid, 3, $toUid);
    return $redis->get(Category::$imUserRelationName . $toUid);
}, self::REDIS_CONN_NAME);

4.网络异常处理,回收服务:针对App崩溃、网络异常断开的链接,主动监听断开的fd,进行关系处理,对所有断开链接的websocket,进行回收,清除关系。

static function onClose(\swoole_server $server, int $fd, int $reactorId)
{
    $info = $server->getClientInfo($fd);
    $fd = intval($fd);
    if ($info && $info['websocket_status'] === WEBSOCKET_STATUS_FRAME) {
        TaskManager::getInstance()->async(function () use ($fd) {
            RedisPool::invoke(function (Redis $redis) use ($fd) {
                //回收用户
                $uid = $redis->hGet('PUSH_MSG_SOCKET_FD', $fd);
                if (isset($uid) && !empty($uid) && is_numeric($uid)) {
                    $redis->zRem('PUSH_MSG_USER_LOGIN', $fd);
                    //检测是否有客服关系未断开
                    $redis->del(Category::$imUserRelationName . $uid);
                    $redis->hDel('PUSH_MSG_SOCKET_FD', $fd);
                }
                //回收客服管理用户
                $cUid = $redis->hGet('PUSH_CUSTOMER_MSG_SOCKET_FD', $fd);
                if (isset($cUid) && !empty($cUid)) {
                    $redis->zRem('PUSH_CUSTOMER_MSG_USER_LOGIN', $fd);
                    $redis->hDel('PUSH_CUSTOMER_MSG_SOCKET_FD', $fd);
                }
            }, 'redis');
        });
    }
}

5.获取离线消息分配算法,按照客服管理员在线人数,把离线消息按照用户来重新组装,平均分配给在线管理员,如果数量不能被整除,也不会造成分配不均情况。

//验证客服管理员在线
$vUid = [];
$server = ServerManager::getInstance()->getSwooleServer();
foreach ($virtualUid as $fd => $vid){
    $info = $server->getClientInfo($fd);
    if ($info && $info['websocket_status'] == 3) {
        $vUid[$fd] = $vid;
    }
}

if (!empty($pullData) && !empty($vUid)) {
    $uIds = array_keys($pullData);
    $row = ceil(count($uIds) / count($vUid));
    $share = array_chunk($uIds, $row, true);
    $keyDict = $vUid;
    $pushList = [];
    // code 组装代码略... 
}

6.websocket对象不回收:从控制台打开新窗口时,就会新增一个websocket对象,后来在浏览器中刷新处理的,没有找到回收的办法。

7.心跳:客服的websocket心跳使用的是实时push消息,5s循环一次,防止链接断开,服务下线。

实践Swoole里的坑

链接数变化正常,但是内存好像没有得到很好的释放,而且进程里也出现了很多野进程,野进程多可能存在的原因是这样的,你没有守护启动,然后主进程挂了,后面的进程找不到父进程,变成了僵尸进程或者是孤儿进程。

请在此添加图片描述

内存也不对劲,大概率是我执行脚本里出了问题,去掉了修改配置的语句,在Base类里加入了unset,及时释放掉内存。

请在此添加图片描述

出现问题的解决的原因是我在Crontab脚本里加了结束时长造成的问题,cli模式下的php生命周期进程得不到释放造成的,合理使用Swoole中的协程就好了。

ini_set('memory_limit', '1024M');
set_time_limit(0);

成果

上线2年的时间里,进行了5次升级和优化,活跃用户10w+,最高峰值6w/s,130w/h访问量,是一个非常成功的实践结果。

用最简单的技术实现方式,节省企业成本,减少系统开发和维护成本,提高办公效率才是技术人应该做的事儿,做解决实际复杂业务解决方案并落地的技术人,En。