MQTT(Message Queue Telemetry Transport),遥测传输协议,提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:

  • 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合
  • 对负载内容屏蔽的消息传输
  • 使用 TCP/IP 提供网络连接
  • 有三种消息发布服务质量:

     “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况:丢失一次读记录无所谓,因为不久后还会有第二次发送,例如  环境传感器数据

     “至少一次”,确保消息到达,但消息重复可能会发生

     “只有一次”,确保消息到达一次。这一级别可用于如下情况:消息重复或丢失会导致不正确的结果,例如 计费系统

  • 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量
  • 使用 Last Will 和 Testament(遗嘱) 特性通知有关客户端的异常中断机制

  先说一下整个协议的构造,整体上协议可拆分为:固定头部 + 可变头部 + 消息体

  协议说白了就是对于双方通信的一个约定,比如传过来一段字符流,第1个字节表示什么,第2个字节表示什么。。。。一个约定

  注意:wireshark 解析出的mqtt协议中,只有value部分是mqtt包里的内容,至于那些头字段是wireshark根据MQTT协议的规定自己添加的,是为了便于理解协议,其并不是MQTT传输包里的内容

  MQTT协议笔记  之  头部信息(NO.1)

  MQTT 3.1协议在线版本: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html 

  官方下载地址: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf

  PDF版本,42页,不算多。

  另外,目前MQTT大家都用在了手机推送,可能还有很多的使用方式,有待进一步的探索。

  阅读完毕MQTT协议,有一个想法,其实可以基于MQTT协议,打造更加私有、精简(协议一些地方,略显多余)的传输协议,比如一个字节的传输开销。有时间,会详细说一下

2. 固定头部

  固定头部,两个字节,共16位:

  MQTT协议笔记  之  头部信息(NO.1)

2.1 第一个字节(byte 1)

  •  Message Type(7-4)

    消息类型,占用4个二进制位,可代表16种消息类型:

    MQTT协议笔记  之  头部信息(NO.1)

    除去0和15位置属于保留待用,共14种消息事件类型

  • DUP flag(3)

     消息重传标志,只占1个二进制位。值可取0或1,默认值为0,表示当前消息是第一次被发送,当值为1时,表示当前消息先前已经被传送过。只适用于客户端或服务器端尝试重发PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE等消息,不能用于检测消息重复发送。

  重发消息时需要注意满足条件:QoS > 0,消息需要回复确认

  此时(DUP flag 为 1 时),在可变头部需要包含消息ID

  • QoS(2-1)

   服务质量(Quality of Service),占用2个二进制位。

   针对 publish 消息:

   MQTT协议笔记  之  头部信息(NO.1)

  • RETAIN(0)

  仅针对PUBLISH消息。不同值,不同含义:

  1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。

    备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。

  0:仅仅为当前订阅者推送此消息。

    假如服务器收到一个消息体为空(zero-length payload)且RETAIN = 1的已存在Topic name的PUBLISH消息,服务器将删除掉与之对应的已被持久化的PUBLISH消息。

  • 如何解析

   因为java使用有符号(最高位为符号位)数据表示,byte范围:-128-127。该字节的最高位(左边第一位),可能为1。若直接转换为byte类型,会出现负数,这是一个雷区。DataInputStream提供了int readUnsignedByte()读取方式,请注意。下面演示了,如何从一个字节中,获取到所有定义的信息,同时绕过雷区:

  

public static void main(String[] args) {
    byte publishFixHeader = 50;// 0 0 1 1 0 0 1 0

    doGetBit(publishFixHeader);
    int ori = 224;//1110000,DISCONNECT ,Message Type (14)
    byte flag = (byte) ori; //有符号byte       
    doGetBit(flag);
    doGetBit_v2(ori);
}


public static void doGetBit(byte flags) {
    boolean retain = (flags & 1) > 0;
    int qosLevel = (flags & 0x06) >> 1;
    boolean dupFlag = (flags & 8) > 0;
    int messageType = (flags >> 4) & 0x0f;

    System.out.format(
            "Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
            messageType, dupFlag, qosLevel, retain);
}

public static void doGetBit_v2(int flags) {
    boolean retain = (flags & 1) > 0;
    int qosLevel = (flags & 0x06) >> 1;
    boolean dupFlag = (flags & 8) > 0;
    int messageType = flags >> 4;

    System.out.format(
            "Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
            messageType, dupFlag, qosLevel, retain);
}

 

2.2 第二个字节

  • Remaining Length(剩余长度)

   当前消息中剩余的字节数,包含 可变头部 和 净负荷,占1-4个字节。单个字节最大值:01111111,16进制:0x7F,10进制为127。单个字节为什么不能是11111111(0xFF)呢?因为MQTT协议规定,第八位(最高位)是个标志位,若为1,则表示还有后续字节存在,MQTT协议最多允许4个字节表示剩余长度,那么最大长度为:0xFF,0xFF,0xFF,0x7F,二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455 byte=261120KB=256MB=0.25GB 四个字节之间值的范围:

     MQTT协议笔记  之  头部信息(NO.1)

   Remaining Length = len(variable headers) + len(body)

  如何换算成十进制呢 ? 使用java语言表示如下:

public static void main(String[] args) throws IOException {
    // 模拟客户端写入
   ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
   DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);
   dataOutputStream.write(0xff);
   dataOutputStream.write(0xff);
   dataOutputStream.write(0xff);
   dataOutputStream.write(0x7f);

   InputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());

    // 模拟服务器/客户端解析
   System. out.println( "result is " + bytes2Length(arrayInputStream));
}

/**
* 转化字节为 int类型长度
* @param in
* @return
* @throws IOException
*/
private static int bytes2Length(InputStream in) throws IOException {
    int multiplier = 1;
    int length = 0;
    int digit = 0;
    do {
        digit = in.read(); //一个字节的有符号或者无符号,转换转换为四个字节有符号 int类型
        length += (digit & 0x7f) * multiplier;
        multiplier *= 128;
   } while ((digit & 0x80) != 0);

    return length;
}
View Code

相关文章: