基于Koa(Node)搭建websocket链接redis实现即时通信

5,005 阅读3分钟

==项目中使用的场景==

账户扫码登录,微信扫码授权,消息实时提醒,配置结果响应,客户端同步数据。。。 之前项目里做即时通信都是用的轮循,轮询的效率低,非常浪费资源,后面好几个项目都开始用的websocket配合koa和redis来实现,现在整理整理深入了解下整个即时通信实现的过程。

前提:需要安装的包

  1. koa== 由 Express 幕后的原班人马打造,使用起来比Node更优雅简洁 koa.bootcss.com/
  2. koa-router== 是路由导航,实际应用中可以认为node接口www.npmjs.com/package/koa…
  3. koa-websocket== koa的中间件对websocket的封装 www.npmjs.com/package/koa…
  4. redis== 我们这里用到的是redis的订阅发布功能www.npmjs.com/package/red…
  5. dotenv== 获取项目文件的环境变量也可以前端在config.js里写死 www.npmjs.com/package/dot…

一、搭建基础的socket服务

搭建方法跟node服务差不多,多了koa-websocket中间件

const Koa = require('koa')
const router = require('koa-router')()
const websockify = require('koa-websocket')
const app = websockify(new Koa())

//创建socket接口
router.all('/kapi/socket/init', async ctx => {
   const { channel } = ctx.query //客户端调接口会传频道id
   console.log(channel)
   ctx.websocket.send('message')
})

// 注册路由允许使用中间件
app.ws.use(router.routes()).use(router.allowedMethods())

//端口号后面可采用动态的
app.listen(3131, () =>
 console.log(`socket listening on port ${Config.socket.port}`)
)

二、链接订阅redis频道

在socket的接口基础上,添加的了redis链接,频道的订阅,这里只展示关键的代码,后面会再做拆分整理

const Koa = require('koa')
const router = require('koa-router')()
const websockify = require('koa-websocket')
const app = websockify(new Koa())
const redis = require('redis')

//跟链接数据库一样,想链接redis频道需要这些参数
//先写死,后面整理使用dotenv动态获取
const redisConfig = { 
   host: '18.8.1.3',
   port: '32',
   password: '1232343',
   db: 4
}
//创建socket接口
router.all('/kapi/socket/init', async ctx => {
  const { channel } = ctx.query //接受客户端传的频道id,并订阅redis
  console.log(channel)
  
  //链接redis  https://www.npmjs.com/package/redis
  let client = redis.createClient(redisConfig)
  
  //  订阅redis频道
  client.subscribe(channel)
  
  // 接收消息
  client.on('message', async (channel, message) => {
    console.log(
      'Received subscribe message, channel [%s] message [%s]',
      channel,
      message
    )
    //接收到消息,通过接口返回到客户端
    await  ctx.websocket.send(message) 
  })
})
 
// 注册路由允许使用中间件
app.ws.use(router.routes()).use(router.allowedMethods())

//端口号后面可采用动态的
app.listen(3131, () =>
  console.log(`socket listening on port ${Config.socket.port}`)
)

三、客户端调用

客户端调用之前一定别忘了先启动socket,可以用pm2或者npm运行koa文件,展示的都是关键的代码,至于监听error 、open、close

import React, { useState, useEffect, useCallback } from 'react'
import createSocket from 'src/api/socket' //引入socket

export default ()=>{
	const [mess,setmess] = useState('暂无消息')
	const [channel] = useState(2)
	
 	const sendSocket = useCallback(() => {
 	
 	 // 创建socket请求,调上面node的接口,传频道id
    wsServer = new WebSocket(
      `localhost:3131/kapi/socket/init?channel=${channel}`
    )
    
    //监听消息的返回
	 wsServer.addEventListener('message', socketMessage)
	 
	  const socketMessage = (event) => {
          const data = JSON.parse(event.data)
	 	if(data.code===200){
	      console.log(data)
	      setmess(data)
	      //得到对应的消息取消监听
	      wsServer.removeEventListener('message', socketMessage)
	      }
      }
      
  }, [channel])

  useEffect(() => {
      sendSocket()//进页面开始调用,调用时机无所谓
   }, [ sendSocket])
   
   return (
   	<div>{mess}</div>
   )
}

四、拆分整理细化

一、将原来的socket服务拆分再细化

在这里插入图片描述
1.index.js

const Koa = require('koa')
const router = require('koa-router')()
const websockify = require('koa-websocket')
const app = websockify(new Koa())
const Config = require('../../config/const')
const socketApiRoutes = require('./route')
// socket route
socketApiRoutes(router)
// 注册路由
app.ws.use(router.routes()).use(router.allowedMethods())

app.listen(Config.socket.port, () =>
  console.log(`socket listening on port ${Config.socket.port}`)
)

2.redis.js

const redis = require('redis')
const dotenv = require('dotenv').config({
  path: process.env.NODE_ENV == 'production' ? '.env' : '.env.local'
})
//上面刚开始是写死的参数,可以动态获取
const ENV = dotenv.parsed
const redisConfig = {
  host: ENV.REDIS_HOST,
  port: ENV.REDIS_PORT,
  password: ENV.REDIS_PASSWORD,
  db: ENV.REDIS_DB
}

// http://redis.js.org/
const createRedisClient = (channel, callback) => {
  // https://www.npmjs.com/package/redis
  let client = redis.createClient(redisConfig)
  //  订阅
  client.subscribe(channel)
  // 监听redis 的ready
  client.on('ready', () => {
    console.log(
      'Redis [%s:%s/%s] is connected and ready for subscribe channel [%s] use.',
      redisConfig.host,
      redisConfig.port,
      redisConfig.database,
      channel
    )
  })
  // 监听redis 的connect
  client.on('connect', () => {
    console.log('Redis connect')
  })
  // 接收消息
  client.on('message', async (channel, message) => {
    console.log(
      'Received subscribe message, channel [%s] message [%s]',
      channel,
      message
    )
    await callback(channel, message)
  })

  // 监听redis 的connect
  client.on('reconnecting', err => {
    console.log('Redis reconnecting:' + err)
  })

  // 监听redis 的错误
  client.on('error', err => {
    console.log('Redis Error:' + err)
  })

  // 监听redis 订阅事件
  client.on('subscribe', (channel, count) => {
    console.log(
      'client subscribed to ' + channel + ',' + count + ' total subscriptions'
    )
  })

  // 监听redis 取消订阅事件
  client.on('unsubscribe', (channel, count) => {
    console.log(
      'client unsubscribed from' +
        channel +
        ', ' +
        count +
        ' total subscriptions'
    )
  })

  return client
}

module.exports = createRedisClient


3.route.js

const createRedisClient = require('./redis')

const socketApiRoutes = router => {
  router.all('/kapi/socket/init', async ctx => {
    const { channel } = ctx.query
    console.log(channel)

    createRedisClient(channel, (channel, message) => {
      console.log(`on message channel: ${channel}`)
      waitForSocketConnection(ctx.websocket, () => {
        // 等待连接已经打开再去发送消息
        ctx.websocket.send(message)
      })
    })

    function waitForSocketConnection(socket, callback) {
      setTimeout(() => {
        if (socket.readyState == 1) {
          if (callback != null) {
            callback()
          }
        } else {
          waitForSocketConnection(socket, callback)
        }
      }, 1000)
    }

    // 监听web 端发来的消息
    ctx.websocket.on('message', function(res) {
      //   console.log('ctx websocket web message', res)
    })

    // 监听 关闭事件
    ctx.websocket.on('close', function() {
      // client.unsubscribe()
      console.log('ctx websocket close')
    })

    ctx.websocket.body = {
      status: true
    }
  })

}

module.exports = socketApiRoutes


二、将客户端socket的调用封装一下

在这里插入图片描述

import { isSupportSocket } from 'src/utils/util'

interface ScoketEvent {
  open?: () => void
  message?: (event: MessageEvent) => void
  error?: () => void
  close?: () => void
}

const createSocket = (pathname: string, eventOption?: ScoketEvent) => {
  let wsServer: WebSocket | null = null

  if (isSupportSocket()) {
    // 请求socket
    wsServer = new WebSocket(
      `${window.location.protocol === 'https:' ? 'wss' : 'ws'}://${
        window.location.host
      }${pathname}`
    )
    // 初始化socket 事件
    let _interval: NodeJS.Timer | null = null
    let _setTimeout: NodeJS.Timer | null = null

    if (wsServer) {
      const socketOpen = () => {
        if (eventOption && eventOption.open) {
          eventOption.open()
        }
        // 心跳检查
        wsServer && wsServer.send('socket heart check')
        _interval = setInterval(() => {
          wsServer && wsServer.send('socket heart check')
        }, 30 * 1000)
      }

      const socketMessage = (event: MessageEvent) => {
        if (eventOption && eventOption.message) {
          eventOption.message(event)
          console.log(event, 'socketMessage')
        }
      }
      const socketError = () => {
        if (eventOption && eventOption.error) {
          eventOption.error()
        }
        if (_interval) clearInterval(_interval)
        wsServer = null
        console.log('websocket error')
      }

      const socketClose = () => {
        if (eventOption && eventOption.close) {
          eventOption.close()
        }

        if (wsServer) {
          wsServer.removeEventListener('open', socketOpen)
          wsServer.removeEventListener('message', socketMessage)
          wsServer.removeEventListener('error', socketError)
        }
        if (_interval) clearInterval(_interval)
        if (_setTimeout) clearTimeout(_setTimeout)
      }

      // 连接建立时触发
      wsServer.addEventListener('open', socketOpen)
      // 监听消息 客户端接收服务端数据时触发
      wsServer.addEventListener('message', socketMessage)
      // 通信发生错误时触发
      wsServer.addEventListener('error', socketError)

      // 通信发生错误时触发
      wsServer.addEventListener('close', socketClose)
    }
  } else {
    console.error('该浏览器不支持socket, 请安装新版本')
  }

  return wsServer
}

export default createSocket


三、客户端socket调用

在页面里调用上面封装好的socket

import React, { useState, useEffect, useCallback } from 'react'
import { RouteConfig } from 'react-router-config'
import createSocket from 'src/api/socket'

interface IProps extends RouteConfig {}

export default (props: IProps) => {
  const [channel] = useState<number>(2)
  const [mess,setmess] = useState<string>('暂无消息')
  
  const sendSocket = useCallback(() => {
    const wsServer: WebSocket | null = createSocket(
      `/kapi/socket/init?channel=${channel}`,
      {
        message: socketMessage
      }
    )
    
    function socketMessage(event: MessageEvent) {
      const data = JSON.parse(event.data)
      console.log(data)
      setmess(data)
      wsServer && wsServer.close()
    }

    return () => {
      if (wsServer) {
        wsServer.removeEventListener('message', socketMessage)
        wsServer.close()
      }
    }
  }, [channel])

  useEffect(() => {
       sendSocket()
  }, [sendSocket])

  return (
    <div>{mess}</div>
  )
}


整理完毕,希望对小伙伴有所帮助。