STOMP协议——基于Websocket实现

5,435

今天继续Websocket之STOMP协议,由于其设计简单,在开发客户端方面使用简便,在很多种语言上都可以见到其身影,并非websocket“独享”。

定义

STOMP(Simple/Streaming Text Orientated Messaging Protocol),即简单(流)文本定向消息协议。属于消息队列的一种协议,有点类似于jms。

作用

提供消息体的格式,允许STOMP客户端(Endpoints)与任意STOMP消息代理(message broker)进行交互,实现客户端之间进行异步消息传送。

角色介绍

图片来源《spring in action》

  • 生产者客户端: 给某destination发送消息;
  • 消费者客户端: 接收所订阅的destination所推送过来的消息;
  • 请求通道: 接收生产者推送过来的消息的线程池;
  • 相应通道: 推送消息给消费者的线程池;
  • 代理: 消息队列管理者. 记录哪些client订阅了哪个destination.
  • 应用目的地址: 发送到这类目的地址的消息在到达broker之前, 会先路由到由应用写的某个方法. 相当于对进入broker的消息进行一次拦截, 目的是针对消息做一些业务处理————图中的”/app”
  • 非应用目的地址: 发送到这类目的地址的消息会直接转到broker. 不会被应用拦截————图中的”/topic”

处理流程

  • 生产者通过发送消息到某个destination
  • 请求通道接受消息
  • 如果目的地址是应用目的地址(/app)则转到相应的由应用自己写的业务方法做处理, 再转到broker
  • 如果目的地址是非应用目的地址(/topic)则直接转到broker.broker构建消息后再通过相应通道推送消息到所有订阅此目的地址的消费者

代码实现

下面使用SpringSecurity和WebSocket-STOMP实现“点对点”消息发送功能:

启用STOMP功能

@Configuration
@EnableWebSocketMessageBroker//开启消息代理
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    /**
     * 建立连接点信息
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws/ep").withSockJS();
         registry.setApplicationDestinationPrefixes("/app");
    }

    /**
     * 配置消息队列
     * 基于内存的STOMP消息代理
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/queue");
    }
}
  • 将 "/ws/ep" 注册为一个 STOMP 端点。客户端在订阅或发布消息到目的地路径前,要连接到该端点
  • 以 /app 开头的消息都会被路由到带有@MessageMapping 或 @SubscribeMapping 注解的方法中;
  • 以 /queue 开头的消息都会发送到STOMP代理中,根据所选择的STOMP代理不同,目的地的可选前缀也会有所限制;
  • 以/user开头的消息会将消息重路由到某个用户独有的目的地上。

处理STOMP消息

自定义通信协议

@Controller
public class WScontroller {

    @Autowired//消息发送模板
    SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/ws/chat")
    public void receiveMessage(String message, Principal principal) {
        String[] split = message.split(";");
        HashMap<String, Object> map = new HashMap<>();
        map.put("username",split[1]);
        map.put("msg",split[0]);
        simpMessagingTemplate.convertAndSendToUser(split[1], "/queue/msg",map);
    }
  • 接收客户端发来的消息,参数就是消息本身message
  • @MessageMapping 或者 @SubscribeMapping 注解可以处理客户端发送过来的消息,并选择方法是否有返回值。
  • @MessageMapping 指定目的地是“/app/ws/chat”(“/app”前缀是隐含的,因为我们将其配置为应用的目的地前缀)。
  • 通信协议可以自定义——可自定义参数的格式
  • 可以接收json格式的数据,传递josn数据时不需要添加额外注解@Requestbody
  • 消息发送者不是从前端传递过来的,而是从springsecurity中获取的,防止前端冒充
  • 如果 @MessageMapping 注解的控制器方法有返回值的话,返回值会被发送到消息代理,只不过会添加上"/topic"前缀。
  • 通过为方法添加@SendTo注解,重载目的地

客户端实现

客户端代码(UVE)

<template>
  <div>
    <div>
      <div v-for="(m,index) in ms">{{m.username}}:{{m.msg}}</div>

    </div>
    <el-input v-model="msg"></el-input>
    <el-button @click="sendMsg"></el-button>
  </div>
</template>

<script>
  import "../../lib/sockjs"
  import "../../lib/stomp"
    export default {
        name: "FriendChat",
      data() {
        return {
          msg: '',
          ms: [],
          stomp: null
        };
      },
      mounted() {
        this.intCon();
      },
      methods: {
        // 建立连接
        initCon() {
          let _this = this;
           this.stomp = Stomp.over(new SockJS("/ws/ep"));
            this.stomp.connect({},success=>{
              _this.stomp.subscribe("/user/queue/msg",msg=>{
                _this.ms.push(JSON.parse(msg.body));
              })
            },fail=>{

            })
        },
        // 发送消息
        sendMsg() {
          this.stomp.send("/ws/chat",{},this.msg)
        }
      }
    }
</script>
<style scoped>
</style>