Spring Boot中使用WebSocket总结(二):向指定用户发送WebSocket消息并处理对方不在线的情况

10,668 阅读9分钟

在上一篇文章(www.zifangsky.cn/1355.html)中我介绍了在Spring项目中使用WebSocket的几种实现方式。但是,上篇文章中只介绍了服务端采用广播模式给所有客户端发送消息,然而我们有时需要服务端给指定用户的客户端发送消息(比如:发送Web通知、实时打印用户任务的日志、两个用户点对点聊天等)。

关于服务端如何给指定用户的客户端发送消息,一般可以通过以下三种方案来实现:

  • 方案一:WebSocket使用“Java提供的@ServerEndpoint注解”实现或者使用“Spring低层级API”实现,在建立连接时从HttpSession中获取用户登录后的用户名,然后把“用户名+该WebSocket连接”存储到ConcurrentHashMap。给指定用户发送消息,只需要根据接收者的用户名获取对方已经建立的WebSocket连接,接着给他发送消息即可
  • 方案二:在页面的监听路径前面动态添加当前登录的“用户ID/用户名”,这样给指定用户发送消息,只需要发送广播消息到监听了前面那个路径的客户端即可。
  • 方案三:这种方案类似于方案一。使用Spring的高级API实现WebSocket,然后自定义HandshakeHandler类并重写determineUser方法,其目的是为了在建立连接时使用用户登录后的用户名作为此次WebSocket的凭证,最后我们就可以使用messagingTemplate.convertAndSendToUser方法给指定用户发送消息了。

注:本篇文章的完整源码可以参考:github.com/zifangsky/W…

使用SimpMessagingTemplate发送消息

使用org.springframework.messaging.simp.SimpMessagingTemplate类可以在服务端的任意地方给客户端发送消息。此外,在我们配置Spring支持STOMP后SimpMessagingTemplate类就会被自动装配到Spring的上下文中,因此我们只需要在想要使用的地方使用@Autowired注解注入SimpMessagingTemplate即可使用。

需要说明的是,SimpMessagingTemplate类有两个重要的方法,它们分别是:

  • public void convertAndSend(D destination, Object payload):给监听了路径destination的所有客户端发送消息payload
  • public void convertAndSendToUser(String user, String destination, Object payload):给监听了路径destination的用户user发送消息payload

一个简单示例:

package cn.zifangsky.stompwebsocket.controller;

import cn.zifangsky.stompwebsocket.model.websocket.Greeting;
import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage;
import cn.zifangsky.stompwebsocket.service.RedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;

/**
 * 测试{@link org.springframework.messaging.simp.SimpMessagingTemplate}类的基本用法
 * @author zifangsky
 * @date 2018/10/10
 * @since 1.0.0
 */
@Controller
@RequestMapping(("/wsTemplate"))
public class MessageTemplateController {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    @Resource(name = "redisServiceImpl")
    private RedisService redisService;

    /**
     * 简单测试SimpMessagingTemplate的用法
     */
    @PostMapping("/greeting")
    @ResponseBody
    public String greeting(@RequestBody Greeting greeting) {
        this.messagingTemplate.convertAndSend("/topic/greeting", new HelloMessage("Hello," + greeting.getName() + "!"));

        return "ok";
    }

}

很显然,这里发送的地址是上篇文章中最后那个示例监听的地址,在客户端页面建立连接后,我们使用Postman请求一下上面这个方法,效果如下:

然后我们可以发现页面中也收到消息了:

向指定用户发送WebSocket消息并处理对方不在线的情况

给指定用户发送消息:

  • 如果接收者在线,则直接发送消息;
  • 否则将消息存储到redis,等用户上线后主动拉取未读消息。

(1)自定义HandshakeInterceptor,用于禁止未登录用户连接WebSocket:

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.Map;

/**
 * 自定义{@link org.springframework.web.socket.server.HandshakeInterceptor},实现“需要登录才允许连接WebSocket”
 *
 * @author zifangsky
 * @date 2018/10/11
 * @since 1.0.0
 */
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

        if(loginUser != null){
            logger.debug(MessageFormat.format("用户{0}请求建立WebSocket连接", loginUser.getUsername()));
            return true;
        }else{
            logger.error("未登录系统,禁止连接WebSocket");
            return false;
        }

    }

    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {

    }

}

(2)自定义HandshakeHandler,用于在建立WebSocket的时候使用自定义的Principal:

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import javax.servlet.http.HttpSession;
import java.security.Principal;
import java.text.MessageFormat;
import java.util.Map;

/**
 * 自定义{@link org.springframework.web.socket.server.support.DefaultHandshakeHandler},实现“生成自定义的{@link java.security.Principal}”
 *
 * @author zifangsky
 * @date 2018/10/11
 * @since 1.0.0
 */
@Component
public class MyHandshakeHandler extends DefaultHandshakeHandler{
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

        if(loginUser != null){
            logger.debug(MessageFormat.format("WebSocket连接开始创建Principal,用户:{0}", loginUser.getUsername()));
            return new MyPrincipal(loginUser.getUsername());
        }else{
            logger.error("未登录系统,禁止连接WebSocket");
            return null;
        }
    }

}

相应地,这里的MyPrincipal继承了java.security.Principal类:

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import java.security.Principal;

/**
 * 自定义{@link java.security.Principal}
 *
 * @author zifangsky
 * @date 2018/10/11
 * @since 1.0.0
 */
public class MyPrincipal implements Principal {
    private String loginName;

    public MyPrincipal(String loginName) {
        this.loginName = loginName;
    }

    @Override
    public String getName() {
        return loginName;
    }
}

(3)自定义ChannelInterceptor,用于在用户断开连接的时候记录日志:

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;

import java.security.Principal;
import java.text.MessageFormat;

/**
 * 自定义{@link org.springframework.messaging.support.ChannelInterceptor},实现断开连接的处理
 *
 * @author zifangsky
 * @date 2018/10/10
 * @since 1.0.0
 */
@Component
public class MyChannelInterceptor implements ChannelInterceptor{
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();

        //用户已经断开连接
        if(StompCommand.DISCONNECT.equals(command)){
            String user = "";
            Principal principal = accessor.getUser();
            if(principal != null && StringUtils.isNoneBlank(principal.getName())){
                user = principal.getName();
            }else{
                user = accessor.getSessionId();
            }

            logger.debug(MessageFormat.format("用户{0}的WebSocket连接已经断开", user));
        }
    }

}

(4)WebSocket相关的完整配置:

package cn.zifangsky.stompwebsocket.config;

import cn.zifangsky.stompwebsocket.interceptor.websocket.AuthHandshakeInterceptor;
import cn.zifangsky.stompwebsocket.interceptor.websocket.MyChannelInterceptor;
import cn.zifangsky.stompwebsocket.interceptor.websocket.MyHandshakeHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * WebSocket相关配置
 *
 * @author zifangsky
 * @date 2018/9/30
 * @since 1.0.0
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
    @Autowired
    private AuthHandshakeInterceptor authHandshakeInterceptor;

    @Autowired
    private MyHandshakeHandler myHandshakeHandler;

    @Autowired
    private MyChannelInterceptor myChannelInterceptor;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp-websocket").withSockJS();

        registry.addEndpoint("/chat-websocket")
                .addInterceptors(authHandshakeInterceptor)
                .setHandshakeHandler(myHandshakeHandler)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //客户端需要把消息发送到/message/xxx地址
        registry.setApplicationDestinationPrefixes("/message");
        //服务端广播消息的路径前缀,客户端需要相应订阅/topic/yyy这个地址的消息
        registry.enableSimpleBroker("/topic");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(myChannelInterceptor);
    }

}

(5)Controller中的消息处理如下:

package cn.zifangsky.stompwebsocket.controller;

import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.enums.ExpireEnum;
import cn.zifangsky.stompwebsocket.model.User;
import cn.zifangsky.stompwebsocket.model.websocket.Greeting;
import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage;
import cn.zifangsky.stompwebsocket.service.RedisService;
import cn.zifangsky.stompwebsocket.utils.JsonUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 测试{@link org.springframework.messaging.simp.SimpMessagingTemplate}类的基本用法
 * @author zifangsky
 * @date 2018/10/10
 * @since 1.0.0
 */
@Controller
@RequestMapping(("/wsTemplate"))
public class MessageTemplateController {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    @Resource(name = "redisServiceImpl")
    private RedisService redisService;

    /**
     * 简单测试SimpMessagingTemplate的用法
     */
    @PostMapping("/greeting")
    @ResponseBody
    public String greeting(@RequestBody Greeting greeting) {
        this.messagingTemplate.convertAndSend("/topic/greeting", new HelloMessage("Hello," + greeting.getName() + "!"));

        return "ok";
    }

    /**
     * 给指定用户发送WebSocket消息
     */
    @PostMapping("/sendToUser")
    @ResponseBody
    public String chat(HttpServletRequest request) {
        //消息接收者
        String receiver = request.getParameter("receiver");
        //消息内容
        String msg = request.getParameter("msg");
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

        HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));
        this.sendToUser(loginUser.getUsername(), receiver, "/topic/reply", JsonUtils.toJson(resultData));

        return "ok";
    }

    /**
     * 给指定用户发送消息,并处理接收者不在线的情况
     * @param sender 消息发送者
     * @param receiver 消息接收者
     * @param destination 目的地
     * @param payload 消息正文
     */
    private void sendToUser(String sender, String receiver, String destination, String payload){
        SimpUser simpUser = userRegistry.getUser(receiver);

        //如果接收者存在,则发送消息
        if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){
            this.messagingTemplate.convertAndSendToUser(receiver, destination, payload);
        }
        //否则将消息存储到redis,等用户上线后主动拉取未读消息
        else{
            //存储消息的Redis列表名
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
            logger.info(MessageFormat.format("消息接收者{0}还未建立WebSocket连接,{1}发送的消息【{2}】将被存储到Redis的【{3}】列表中", receiver, sender, payload, listKey));

            //存储消息到Redis中
            redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload);
        }

    }


    /**
     * 拉取指定监听路径的未读的WebSocket消息
     * @param destination 指定监听路径
     * @return java.util.Map<java.lang.String,java.lang.Object>
     */
    @PostMapping("/pullUnreadMessage")
    @ResponseBody
    public Map<String, Object> pullUnreadMessage(String destination){
        Map<String, Object> result = new HashMap<>();
        try {
            HttpSession session = SpringContextUtils.getSession();
            //当前登录用户
            User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

            //存储消息的Redis列表名
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
            //从Redis中拉取所有未读消息
            List<Object> messageList = redisService.rangeList(listKey, 0, -1);

            result.put("code", "200");
            if(messageList !=null && messageList.size() > 0){
                //删除Redis中的这个未读消息列表
                redisService.delete(listKey);
                //将数据添加到返回集,供前台页面展示
                result.put("result", messageList);
            }
        }catch (Exception e){
            result.put("code", "500");
            result.put("msg", e.getMessage());
        }

        return result;
    }

}

注:这里对应的几个Redis操作的方法如下:

@Override
public boolean delete(String key) {
	return redisTemplate.delete(key);
}

@Override
public void addToListLeft(String listKey, ExpireEnum expireEnum, Object... values) {
	//绑定操作
	BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
	//插入数据
	boundValueOperations.leftPushAll(values);
	//设置过期时间
	boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit());
}

@Override
public void addToListRight(String listKey, ExpireEnum expireEnum, Object... values) {
	//绑定操作
	BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
	//插入数据
	boundValueOperations.rightPushAll(values);
	//设置过期时间
	boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit());
}

@Override
public List<Object> rangeList(String listKey, long start, long end) {
	//绑定操作
	BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
	//查询数据
	return boundValueOperations.range(start, end);
}

(6)示例页面:

<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta content="text/html;charset=UTF-8"/>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <title>Chat With STOMP Message</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
    <script th:src="@{/layui/layui.js}"></script>
    <script th:src="@{/layui/lay/modules/layer.js}"></script>
    <link th:href="@{/layui/css/layui.css}" rel="stylesheet">
    <link th:href="@{/layui/css/modules/layer/default/layer.css}" rel="stylesheet">
    <link th:href="@{/css/style.css}" rel="stylesheet">
    <style type="text/css">
        #connect-container {
            margin: 0 auto;
            width: 400px;
        }

        #connect-container div {
            padding: 5px;
            margin: 0 7px 10px 0;
        }

        .message input {
            padding: 5px;
            margin: 0 7px 10px 0;
        }

        .layui-btn {
            display: inline-block;
        }
    </style>
    <script type="text/javascript">
        var stompClient = null;

        $(function () {
            var target = $("#target");
            if (window.location.protocol === 'http:') {
                target.val('http://' + window.location.host + target.val());
            } else {
                target.val('https://' + window.location.host + target.val());
            }
        });

        function setConnected(connected) {
            var connect = $("#connect");
            var disconnect = $("#disconnect");
            var echo = $("#echo");

            if (connected) {
                connect.addClass("layui-btn-disabled");
                disconnect.removeClass("layui-btn-disabled");
                echo.removeClass("layui-btn-disabled");
            } else {
                connect.removeClass("layui-btn-disabled");
                disconnect.addClass("layui-btn-disabled");
                echo.addClass("layui-btn-disabled");
            }

            connect.attr("disabled", connected);
            disconnect.attr("disabled", !connected);
            echo.attr("disabled", !connected);
        }

        //连接
        function connect() {
            var target = $("#target").val();

            var ws = new SockJS(target);
            stompClient = Stomp.over(ws);

            stompClient.connect({}, function () {
                setConnected(true);
                log('Info: STOMP connection opened.');

                //连接成功后,主动拉取未读消息
                pullUnreadMessage("/topic/reply");

                //订阅服务端的/topic/reply地址
                stompClient.subscribe("/user/topic/reply", function (response) {
                    log(JSON.parse(response.body).content);
                })
            },function () {
                //断开处理
                setConnected(false);
                log('Info: STOMP connection closed.');
            });
        }

        //断开连接
        function disconnect() {
            if (stompClient != null) {
                stompClient.disconnect();
                stompClient = null;
            }
            setConnected(false);
            log('Info: STOMP connection closed.');
        }

        //向指定用户发送消息
        function sendMessage() {
            if (stompClient != null) {
                var receiver = $("#receiver").val();
                var msg = $("#message").val();
                log('Sent: ' + JSON.stringify({'receiver': receiver, 'msg':msg}));

                $.ajax({
                    url: "/wsTemplate/sendToUser",
                    type: "POST",
                    dataType: "json",
                    async: true,
                    data: {
                        "receiver": receiver,
                        "msg": msg
                    },
                    success: function (data) {

                    }
                });
            } else {
                layer.msg('STOMP connection not established, please connect.', {
                    offset: 'auto'
                    ,icon: 2
                });
            }
        }

        //从服务器拉取未读消息
        function pullUnreadMessage(destination) {
            $.ajax({
                url: "/wsTemplate/pullUnreadMessage",
                type: "POST",
                dataType: "json",
                async: true,
                data: {
                    "destination": destination
                },
                success: function (data) {
                    if (data.result != null) {
                        $.each(data.result, function (i, item) {
                            log(JSON.parse(item).content);
                        })
                    } else if (data.code !=null && data.code == "500") {
                        layer.msg(data.msg, {
                            offset: 'auto'
                            ,icon: 2
                        });
                    }
                }
            });
        }

        //日志输出
        function log(message) {
            console.debug(message);
        }
    </script>
</head>
<body>
    <noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being
        enabled. Please enable
        Javascript and reload this page!</h2></noscript>
    <div>
        <div id="connect-container" class="layui-elem-field">
            <legend>Chat With STOMP Message</legend>
            <div>
                <input id="target" type="text" class="layui-input" size="40" style="width: 350px" value="/chat-websocket"/>
            </div>
            <div>
                <button id="connect" class="layui-btn layui-btn-normal" onclick="connect();">Connect</button>
                <button id="disconnect" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled"
                        onclick="disconnect();">Disconnect
                </button>

            </div>
            <div class="message">
                <input id="receiver" type="text" class="layui-input" size="40" style="width: 350px" placeholder="接收者姓名" value=""/>
                <input id="message" type="text" class="layui-input" size="40" style="width: 350px" placeholder="消息内容" value=""/>
            </div>
            <div>
                <button id="echo" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled"
                        onclick="sendMessage();">Send Message
                </button>
            </div>
        </div>
    </div>
</body>
</html>

启动项目后,分别在两个浏览器中使用不同的账号登录,接着互相给对方发送消息,效果如下:

界面一:

img

界面二:

img