MQTT 服务搭建

1,400 阅读5分钟

MQTT 服务搭建(Node.js)

最近在写自己的毕设,采用的技术栈是 node + MySQL 和 Vue3 + Element Plus,还用了一个即时通讯协议 MQTT。

对这个 MQTT 的东西感到比较疑惑,网上资料也参差不齐,所以打算做一个总结。

本文章的代码部分,主要包含两部分:

  • node 端部署 MQTT 服务,并在 node 端连接

  • node 端部署,但在前端连接

MQTT 协议介绍

MQTT协议(Message Queuing Telemetry Transport,消息队列遥测传输)是一种基于发布/订阅模式的轻量级消息协议,它适用于网络带宽有限或网络环境不稳定的情况。有如下优点:

  • 轻量级与带宽小:MQTT协议设计简单,消息头部较小,协议传输的数据量也较小。即使在网络状况较差的情况下,MQTT也能提供不错的体验。
  • 发布/订阅模型:MQTT协议采用发布/订阅模型,设备之间通过中间代理服务器进行通信。发布者将消息发布到代理服务器上,订阅者从代理服务器接收与其订阅主题相关的消息。
  • 可靠性:MQTT协议支持三种不同的服务质量(QoS)级别,包括最多一次传递、至少一次传递和只有一次传递,从而提供了不同的消息传输可靠性选项。
  • 异步通信:MQTT协议使用异步通信模型,客户端可以通过订阅主题来接收实时的消息,从而支持实时事件驱动的应用场景。

一些小知识:

如果 MQTT Client 想接收离线信息:

Client 必须使用持久化的对话

持久化会话要求客户端必须使用固定的 Client ID 连接

MQTT 中的 QoS 等级:

QoS 0, At most once, 至多一次

QoS 1, At least once, 至少一次

QoS 1, Exactly once, 确保能,且只能收到一次

Node 端代码环节

在查询一些资料后发现 mosca 已经停止维护了,它的开发者已经推出了一个叫 aedes 的库,支持性更好。

安装 aedes

首先得先安装 aedes.jsmqtt.js

yarn add aedes
yarn add mqtt

我的文件目录是这样的

 ├── index.js
 ├── pub.js
 ├── sub.js
 └── pakage.js

MQTT 服务器部署

首先完成 MQTT 本地服务器的部署,新建一个文件 index.js

const aedes = require('aedes')();
const mqttServer = require('net').createServer(aedes.handle);
const port = 1883;

mqttServer.listen(port, function () {
    console.log('mqtt server started and listening on port ', port)
});

// 身份验证
aedes.authenticate = function (_client, username, password, callback) {
    // with no error, successful be true
    // callback(error, successful)
    callback(null, (username === 'user' && password.toString() === '123456'));
}

// 客户端连接
aedes.on('client', function (client) {
    console.log('Client Connected: \x1b[33m' + (client ? client.id : client) +
    '\x1b[0m', 'to broker', aedes.id);
});

// 客户端断开
aedes.on('clientDisconnect', function (client) {
    console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + 
    '\x1b[0m', 'to broker', aedes.id);
});

订阅者创建

新建一个 sub.js 文件

const mqtt = require("mqtt");

const client = mqtt.connect("mqtt://127.0.0.1:1883", {
  username: "user",
  password: "123456",
});

client.on("connect", function () {
  console.log("服务器连接成功");
  console.log(client.options.clientId);
  client.subscribe("text", { qos: 1 }); // 订阅text消息
});

client.on("message", function (top, message) {
  console.log("当前topic:", top);
  console.log(message.toString());
});

client.on("error", function (error) {
  console.log(error);
});

client.on("disconnect", function (packet) {
  console.log('连接断开');
});

发布者创建

新建一个 pub.js 文件

const mqtt = require("mqtt");

const client  = mqtt.connect('mqtt://127.0.0.1:1883',{
  username: "user",
  password: '123456'
});

client.on("connect", function() {
  console.log("服务器连接成功");
  console.log(client.options.clientId);
  client.publish("text", JSON.stringify({id: 1}), { qos: 0, retain: true }); // 发布主题text消息
});

client.on('error', function (error) {
  console.log(error)
})

client.on('disconnect', function (packet) {
  console.log(packet)
})

所有文件都创建完毕后,依次执行:

node index
node sub
node pub

这样,一个简易的在 node 端运行的 demo 就做好了。但是还有些问题没有解决,我在前端连接 Mqtt 服务时,发现连接不上。

下面我将介绍关于用 node 部署 Mqtt 服务,并与前端通信。

Node 端部署,并通过前端连接

由于 MQTT 是基于 TCP/IP 协议,要与前端通信的话,必须借助 WebSocket

然后就有了 MQTT over WebSocket

同样我们得先安装 aedes 和 mqtt 这两个包,然后创建 index.js

const aedes = require('aedes')()
const httpServer = require('http').createServer()
const ws = require('websocket-stream')
const port = 8083;

ws.createServer({ server: httpServer }, aedes.handle)

httpServer.listen(port, function () {
  console.log('mqtt over websocket server listening on port ', port)
})

// 身份验证
aedes.authenticate = function (_client, username, password, callback) {
    // with no error, successful be true
    // callback(error, successful)
    callback(null, (username === 'user' && password.toString() === '123456'));
}

// 客户端连接
aedes.on('client', function (client) {
    console.log('Client Connected: \x1b[33m' + (client ? client.id : client) +
    '\x1b[0m', 'to broker', aedes.id);
});

// 客户端断开
aedes.on('clientDisconnect', function (client) {
    console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + 
    '\x1b[0m', 'to broker', aedes.id);
});

Vue 端订阅者创建

前端Vue进行订阅的代码(需要安装 mqtt ) sub.js

import * as mqtt from "mqtt/dist/mqtt";

const client = mqtt.connect("ws://127.0.0.1:8083/mqtt", {
  username: "user",
  password: "123456",
});

client.on("connect", function () {
  console.log("服务器连接成功");
  console.log(client.options.clientId);
  client.subscribe("text", { qos: 1 }); // 订阅text消息
});

client.on("message", function (top, message) {
  console.log("当前topic:", top);
  console.log(message.toString());
});

client.on("error", function (error) {
  console.log(error);
});

client.on("disconnect", function (packet) {
  console.log('连接断开');
});

Vue 端发布者创建

前端Vue进行发布的代码(需要安装 mqtt ) sub.js

import * as mqtt from "mqtt/dist/mqtt";

const client  = mqtt.connect("ws://127.0.0.1:8083/mqtt",{
  username: "user",
  password: '123456'
});

client.on("connect", function() {
  console.log("服务器连接成功");
  console.log(client.options.clientId);
  client.publish("text", JSON.stringify({id: 1}), { qos: 0, retain: true }); // 发布主题text消息
});

client.on('error', function (error) {
  console.log(error)
})

client.on('disconnect', function (packet) {
  console.log(packet)
})

前端部分代码,我还暂时没有测试(因为已经封装过了,测试需要重新写一部分,请原谅我比较懒),有问题可以在评论区交流🥰

有更多需求的可以参考官方 api,链接就在下面,需要一定的英语能力。强烈建议了解一下官方用例。

觉得有用的话,点个赞再走呗😍

参考文章:

aedes 的官方示例(github 英文)

aedes api 参考

MQTT 持久会话与 Clean Session 详解 | EMQ (emqx.com)

nodejs 快速搭建MQTT服务器_node aedes-CSDN博客