HarmonyOS ArkTS 实现MQTT协议

1,011 阅读8分钟

2023车票已废,12月31日以“年会不能停”的电影作为年度结尾,非常开心和共情。2024今已开始 ,特此留念HarmonyOS ArkTS实现MQTT协议的过程。

介绍

MQTT是物联网中的一种协议,在HarmonyOS API9平台,解决方案以C++库移植为实现方案。

遥遥领先的平台,使用MQTT怎能不遥遥领先呢!   

新年快乐,本篇将带领你手把手实现HarmonyOS ArkTS语言的MQTT协议。

准备

  1. 阅读MQTT 5.0协议【 docs.oasis-open.org/mqtt/mqtt/v…
  2. 安装Mosquitto服务器【 www.mosquitto.org/
  3. MQTT 联盟中的中国成员实现的PC版客户端【 mqttx.app

效果

SVID_20240101_194618 -small-original.gif

HarmonyOS MQTT实现概况

业务流程 截屏2024-01-01 22.59.58.png

页面与功能实现类 截屏2024-01-01 23.21.46.png

MQTT 实现路线

MQTT协议最大的好处是纯软件协议, 不像Zigbee协议还会和硬件相关,所以在实现MQTT协议前,我们只需要专注于两件事情:

  1. MQTT协议传输通道 
  2. MQTT协议格式

通道即传输数据的载体,这里指的是Sokect, 了解HarmonyOS平台的Sokect使用方法尤为重要。

数据传输通道打通之后,需要专注于MQTT协议格式的实现。

由于之前对MQTT协议有所了解,所以在HarmonyOS 平台中看到Socket时,觉得在这个新平台上实现MQTT协议理论上没有任何问题,所以最终决定干它。

下边这张图是实现HarmonyOS MQTT协议的过程,仅作参考。

MQTT资深专家,只需要阅读关于HarmonyOS Socket的API 和 TypeScript 操作字节的一些基础内容即可

HarmonyOS MQTT实现路径.png

MQTT协议介绍

一)  MQTT共有15种协议

  1. CONNECT
  2. CONNACK
  3. PUBLISH
  4. PUBACK
  5. PUBREC
  6. PUBREL
  7. PUBCOMP
  8. DISCONNECT
  9. PINGREQ
  10. PINGRES
  11. SUBSCRIBE
  12. SUBACK
  13. UNSUBSCRIBE
  14. UNSUBACK
  15. AUTH

二)MQTT共有7种数据格式

这些数据格式名称会在MQTT技术规范中作为标准用语使用

  1. one Byte
  2. Two Byte
  3. Four Byte
  4. UTF-8 String
  5. UTF-8 String Pair
  6. Variable Byte Integer
  7. Binary Data

数据格式实现

数据格式稍微看一下即可

详情可参见MQTT V5.0规范【docs.oasis-open.org/mqtt/mqtt/v…

关于数据格式文档可以查找关键词 “1.5 Data representation

Two Byte

比如数字2,如果要编码为两个字节,则调用twoByte方法即可

twoByte(2)
public static twoByte(content: number): Uint8Array{
   return new Uint8Array([(content & 0xff00) >> 8 , content & 0xff])
}

Four Byte

比如数字2,如果要编码为四个字节,则调用

fourByte(2)
public static fourByte(content: number): Uint8Array{
  return new Uint8Array([(content & 0xff000000) >> 24 ,(content & 0xff0000) >> 16, (content & 0xff00) >> 8 , content & 0xff])
}

UTF-8 String

比如数字2,如果要采用UTF-8编码,则调用

utf8String(2)
public static utf8String(content: string): Uint8Array{
  const encoder = new util.TextEncoder()
  let u8a_encoder = encoder.encodeInto(content)

  let encoderLength = u8a_encoder.length

  let abEncoder = new ArrayBuffer(encoderLength + 2)
  const dv_encoder = new DataView(abEncoder)
  dv_encoder.setInt8(0, (encoderLength & 0xff00) >> 8)
  dv_encoder.setInt8(1, encoderLength & 0x00ff)

  let index: number = 2
  u8a_encoder.forEach( (value) => {
    dv_encoder.setInt8(index++, value)
  })

  return new Uint8Array(abEncoder)
}

UTF-8 String Pair

比如Key为‘Key-Hello’,Value为‘Value-World’如果要采用Key-Value方式的UTF-8编码,则调用

utf8StringPair('Key-Hello','Value-World')
public static utf8StringPair(key: string, value: string): Uint8Array{
  let u8a_key = this.utf8String(key)
  let u8a_value = this.utf8String(value)

  let merge = new Uint8Array(u8a_key.length + u8a_value.length)
  merge.set(u8a_key)
  merge.set(u8a_value, u8a_key.length)
  return merge
}

Variable Byte Integer

可变字节编码,这种编码方式的最大长度是4个字节。

所以,用1~4个字节可动态编码相关数字,实现节省占用内存的目标

比如数字2,编码完成后,只会占用一个字节

encodeVariableByteInteger(2)
public static encodeVariableByteInteger(content: number): Uint8Array {

  let cacheDigit = new Array<number>()

  let numBytes: number = 0
  let no: number = content

  do {
    let digit = no % 128
    no = parseInt(no / 128 +'', )
    if (no > 0) {
      digit |= 0x80
    }
    cacheDigit.push(digit)
    numBytes++
  } while ( (no > 0) && (numBytes < 4) )

  return new Uint8Array(cacheDigit)
}

Binary Data

这种数据格式是由2字节和剩余的字节数组组成,除2个字节之外的字节数组在解码时,需要按照约定来解码。

比如想要表达数字2,则调用

binaryData(new Uint8Array([2]))
public static binaryData(ext: Uint8Array): Uint8Array{
  let length = ext.length

  let u8a_binaryData = new Uint8Array(2 + length)

  u8a_binaryData[0] = (length & 0xff00) >> 8
  u8a_binaryData[1] = length & 0x00ff

  let index: number = 2
  ext.forEach((value)=>{
    u8a_binaryData[index++] = value
  })

  return u8a_binaryData
}

HarmonyOS Socket

HarmonyOS 官方指南

developer.harmonyos.com/cn/docs/doc…

1)包引入

注意:这个包在编译时,DevEco Studio 会提示警告,但官方资料中并未做说明

import socket from '@ohos.net.socket';

2)创建Socket

// 创建一个TCPSocket连接,返回一个TCPSocket对象。
let tcp = socket.constructTCPSocketInstance();

3) 注册Socket事件消息

  • 连接事件

  • 接收消息事件

  • Socket关闭事件

  • 错误事件

tcp.on('message', value => {  
       console.log("on message")  
       let buffer = value.message  
       let dataView = new DataView(buffer)  
       let str = ""  
       for (let i = 0; i < dataView.byteLength; ++i) {    
          str += String.fromCharCode(dataView.getUint8(i))  
       }  
        console.log("on connect received:" + str)
     });
     
tcp.on('connect', () => {  console.log("on connect")});

tcp.on('close', () => {  console.log("on close")});

tcp.on('error', () => {  console.log("on error")});

4) 绑定本地端口

// 绑定IP地址和端口。
let bindAddress = {
  address: '192.168.xx.xx',
  port: 1234, // 绑定端口,如1234
  family: 1
};
tcp.bind(bindAddress, err => {
  if (err) {
    console.log('bind fail');
    return;
  }
  console.log('bind success');
  
  //TODO 连接服务端代码
  ......

});

5) 连接Broker服务器

// 绑定IP地址和端口。
let bindAddress = {
  address: '192.168.xx.xx',
  port: 1234, // 绑定端口,如1234
  family: 1
};
tcp.bind(bindAddress, err => {
  if (err) {
    console.log('bind fail');
    return;
  }
  console.log('bind success');
  
  let connectAddress = {
     address: '192.168.xx.xx',
     port: 5678, // 连接端口,如5678
    family: 1
  };
  tcp.connect({
      address: connectAddress, timeout: 6000
      }, err => {
        if (err) {
         console.log('connect fail');
         return;
        }
      console.log('connect success');
  
      // 发送数据
      tcp.send({
          data: 'Hello, server!'
         }, err => {
           if (err) {
              console.log('send fail');
              return;
            }
          console.log('send success');
       })
   });
});

MQTT 协议实现一瞥

  • 协议的实现过程,需要对照MQTT V5.0规范逐条实现。
  • 切不可操之过急
  • 发布一条消息和接收一条消息用到的协议都是PUBLISH

CONNECT协议

如下代码即为CONNECT协议实现的源码

Socket通道创建成功之后,仅仅代表客户端与服务端已经建立了消息通道,但这条通道是否可以使用,就需要用到“连接“协议,简单点的讲:客户端和服务端开始通信,交流的规则肯定是一致的。

从如下CONNECT协议实现源码注释中可见,一个通信包包含3个部分:1. 固定头(Fixed Header) 2. 可变头(Variable Header)3. 负载体(PayLoad)

注意:固定头表示由两部分组成,第一部分为一个字节,表示数据包类型和其它预留属性,第二部分表示数据包除第一个字节和自己所占用的字节以外,还剩余多少个字节,用于数据包解码参照计算。第二部分的字节长度数据类型为Variable Byte Integer,因此,其可能占用1~4个字节

import ArrayList from '@ohos.util.ArrayList';
import MQTTCommon from '../common/MQTTCommon';
import MqttDataTypes from '../common/MqttDataTypes';

//3.1 CONNECT – Connection Request
export default class MQTTConnect{

  //3.1.1 CONNECT Fixed Header 【1字节 + Variable Byte Integer】
  //3.1.2 CONNECT Variable Header
      //3.1.2.1 Protocol Name 【6字节】
      //3.1.2.2 Protocol Version 【1字节】
      //3.1.2.3 Connect Flags 【1字节】
      //3.1.2.10 Keep Alive 【2字节】
      //3.1.2.11 CONNECT Properties
          //3.1.2.11.1 Property Length 【1字节】
          //3.1.2.11.2 Session Expiry Interval 【4字节】
          //3.1.2.11.3 Receive Maximum 【2字节】
          //3.1.2.11.4 Maximum Packet Size 【4字节】
          //3.1.2.11.5 Topic Alias Maximum 【2字节】
          //3.1.2.11.6 Request Response Information 【1字节】
          //3.1.2.11.7 Request Problem Information 【1字节】
          //3.1.2.11.8 User Property【UTF-8 String Pair】
          //3.1.2.11.9 Authentication Method 【UTF-8 String】
          //3.1.2.11.10 Authentication Data【Binary Data】
  //3.1.3 CONNECT Payload
      //3.1.3.1 Client Identifier (ClientID) 【UTF-8 String】
      //3.1.3.2 Will Properties
          //3.1.3.2.1 Property Length 【Variable Byte Integer】
          //3.1.3.2.2 Will Delay Interval 【4字节】
          //3.1.3.2.3 Payload Format Indicator 【1字节】
          //3.1.3.2.4 Message Expiry Interval 【4字节】
          //3.1.3.2.5 Content Type【UTF-8 String】
          //3.1.3.2.6 Response Topic【UTF-8 String】
          //3.1.3.2.7 Correlation Data 【Binary Data】
          //3.1.3.2.8 User Property【UTF-8 String Pair】
     //3.1.3.3 Will Topic 【UTF-8 String】
     //3.1.3.4 Will Payload【Binary Data】
     //3.1.3.5 User Name【UTF-8 String】
     //3.1.3.6 Password【Binary Data】
  //3.1.4 CONNECT Actions

   public static buildPacket(): Uint8Array{
      let allData = new ArrayList<Uint8Array>()

      let remainLength: number = 0

      //3.1.1 CONNECT Fixed Header - 包类型
      let u8a_packettype = MqttDataTypes.oneByte(MQTTCommon.PACKET_TYPE_REQ_CONNECT << 4)
      allData.add(u8a_packettype)

      //3.1.2 CONNECT Variable Header
      //3.1.2.1 Protocol Name
      let u8a_protolName = MqttDataTypes.utf8String('MQTT')
      allData.add(u8a_protolName)
      remainLength += u8a_protolName.length

      //3.1.2.2 Protocol Version
      let u8a_version = MqttDataTypes.oneByte(5)
      allData.add(u8a_version)
      remainLength++

      //3.1.2.3 Connect Flags
      const  UserNameFlag: number = 0x80
      const  PasswordFlag: number = 0x40
      const  WillRetain: number = 0x20
      const  WillQoS0: number = 0x00
      const  WillQoS1: number = 0x8
      const  WillQoS2: number = 0x10
      const  WillQoS3: number = 0x18
      const  WillFlag: number = 0x4
      const  CleanStart: number = 0x2

      let connectFlags: number = 0
      //可以根据实际对外暴露的接口,在这里进行与运算
      connectFlags = CleanStart
      let u8a_connectFlags = MqttDataTypes.oneByte(connectFlags)
      allData.add(u8a_connectFlags)
      remainLength += u8a_connectFlags.length

      //3.1.2.10 Keep Alive
      const keepAlive = 3600

      let u8a_keepalive = MqttDataTypes.twoByte(keepAlive)
      allData.add(u8a_keepalive)
      remainLength += u8a_keepalive.length

      //3.1.2.11 CONNECT Properties
      let propertiesLength: number = 0
      let insertStartIndex: number = allData.length

      //3.1.2.11.2 Session Expiry Interval
      let u8a_sessionexpiryintervaltype = MqttDataTypes.oneByte(0x11)
      allData.add(u8a_sessionexpiryintervaltype)
      propertiesLength += u8a_sessionexpiryintervaltype.length

      let u8a_sessionexpiryinterval = MqttDataTypes.fourByte(60)
      allData.add(u8a_sessionexpiryinterval)
      propertiesLength += u8a_sessionexpiryinterval.length

      //3.1.2.11.3 Receive Maximum
      let u8a_receiveMaximumType = MqttDataTypes.oneByte(0x21)
      allData.add(u8a_receiveMaximumType)
      propertiesLength += u8a_receiveMaximumType.length

      let u8a_receiveMaximum = MqttDataTypes.twoByte(10000)
      allData.add(u8a_receiveMaximum)
      propertiesLength += u8a_receiveMaximum.length

      //3.1.2.11.4 Maximum Packet Size
      let u8a_maximumPacketSizeType = MqttDataTypes.oneByte(0x27)
      allData.add(u8a_maximumPacketSizeType)
      propertiesLength += u8a_maximumPacketSizeType.length

      let u8a_maximumPacketSize = MqttDataTypes.fourByte(10000)
      allData.add(u8a_maximumPacketSize)
      propertiesLength += u8a_maximumPacketSize.length

      //3.1.2.11.8 User Property
      let u8a_userPropertyType = MqttDataTypes.oneByte(0x26)
      allData.add(u8a_userPropertyType)
      propertiesLength += u8a_userPropertyType.length

      let u8a_userProperty = MqttDataTypes.utf8StringPair('hello', 'word')
      allData.add(u8a_userProperty)
      propertiesLength += u8a_userProperty.length

      //3.1.2.11.1 Property Length
      let u8a_propertylength = MqttDataTypes.encodeVariableByteInteger(propertiesLength)
      allData.insert(u8a_propertylength, insertStartIndex)

      remainLength += (u8a_propertylength.length  + propertiesLength)

      //3.1.3 CONNECT Payload
      //3.1.3.1 Client Identifier (ClientID)
      let u8a_clientidentifier = MqttDataTypes.utf8String('Harvey鸿蒙')
      allData.add(u8a_clientidentifier)
      remainLength += u8a_clientidentifier.length

      //3.1.1 CONNECT Fixed Header - 包剩余长度
      let u8a_remainlength = MqttDataTypes.encodeVariableByteInteger(remainLength)
      allData.insert(u8a_remainlength, 1)

      //合并数据
      let allUint8Array = new Uint8Array(1 + u8a_remainlength.length + remainLength)

      let allStr2: string = ''
      let allUint8ArrayIndex: number = 0

      allData.forEach((element: Uint8Array)=>{
           element.forEach( (value) => {
              allUint8Array[allUint8ArrayIndex++] = value
              allStr2 = allStr2.concat(value.toString(16).padStart(2, '0')).concat(' ')
           })
      })
      console.log(allStr2)

      return allUint8Array

   }


}

总结

按照MQTT V5.0官方文档实现协议,对本人来讲,感觉非常开心,开心的原因回过头来想想,可能就几条吧

  1. 做完了一件以前不愿意做的事情
  2. 这件事情可造?造。事情本就是一件一件来做的,想做就按照想法做

愿大家2024年,不负时光,行大运,发大财。