一、DotNetty背景介绍

    某天发现 dotnet  是个好东西,就找了个项目来练练手。于是有了本文的 Mqtt 客户端   (github:  MqttFx )

DotNetty是微软的Azure团队,使用C#实现的Netty的版本发布。不但使用了C#和.Net平台的技术特点,并且保留了Netty原来绝大部分的编程接口。让我们在使用时,完全可以依照Netty官方的教程来学习和使用DotNetty应用程序。 

DotNetty同时也是开源的,它的源代码托管在Github上: https://github.com/azure/dotnetty

Netty 的官方文档 : http://netty.io/wiki/all-documents.html

 

二、Packet

DotNetty.Codecs.Mqtt, 本项目没有使用。直接写了一个。

FixedHeader:  固定报头

    /// <summary>
    /// 固定报头
    /// </summary>
    public class FixedHeader
    {
        /// <summary>
        /// 报文类型
        /// </summary>
        public PacketType PacketType { get; set; }
        /// <summary>
        /// 重发标志
        /// </summary>
        public bool Dup { get; set; }
        /// <summary>
        /// 服务质量等级
        /// </summary>
        public MqttQos Qos { get; set; }
        /// <summary>
        /// 保留标志
        /// </summary>
        public bool Retain { get; set; }
        /// <summary>
        /// 剩余长度
        /// </summary>
        public int RemaingLength { internal get; set; }

        public FixedHeader(PacketType packetType)
        {
            PacketType = packetType;
        }

        public FixedHeader(byte signature, int remainingLength)
        {
            PacketType = (PacketType)((signature & 0xf0) >> 4);
            Dup = ((signature & 0x08) >> 3) > 0;
            Qos = (MqttQos)((signature & 0x06) >> 1);
            Retain = (signature & 0x01) > 0;
            RemaingLength = remainingLength;
        }

        public void WriteTo(IByteBuffer buffer)
        {
            var flags = (byte)PacketType << 4;
            flags |= Dup.ToByte() << 3;
            flags |= (byte)Qos << 1;
            flags |= Retain.ToByte();

            buffer.WriteByte((byte)flags);
            buffer.WriteBytes(EncodeLength(RemaingLength));
        }

        static byte[] EncodeLength(int length)
        {
            var result = new List<byte>();
            do
            {
                var digit = (byte)(length % 0x80);
                length /= 0x80;
                if (length > 0)
                    digit |= 0x80;
                result.Add(digit);
            } while (length > 0);

            return result.ToArray();
        }
    }

  

 

Packet:  消息基类

    /// <summary>
    /// 消息基类
    /// </summary>
    public abstract class Packet
    {
        #region FixedHeader

        /// <summary>
        /// 固定报头
        /// </summary>
        public FixedHeader FixedHeader { protected get; set; }
        /// <summary>
        /// 报文类型
        /// </summary>
        public PacketType PacketType => FixedHeader.PacketType;
        /// <summary>
        /// 重发标志
        /// </summary>
        public bool Dup => FixedHeader.Dup;
        /// <summary>
        /// 服务质量等级
        /// </summary>
        public MqttQos Qos => FixedHeader.Qos;
        /// <summary>
        /// 保留标志
        /// </summary>
        public bool Retain => FixedHeader.Retain;
        /// <summary>
        /// 剩余长度
        /// </summary>
        public int RemaingLength => FixedHeader.RemaingLength;

        #endregion

        public Packet(PacketType packetType) => FixedHeader = new FixedHeader(packetType);

        public virtual void Encode(IByteBuffer buffer) { }

        public virtual void Decode(IByteBuffer buffer) { }
    }

  

PacketWithId: 消息基类(带ID)

    /// <summary>
    /// 消息基类(带ID)
    /// </summary>
    public abstract class PacketWithId : Packet
    {
        public PacketWithId(PacketType packetType) : base(packetType)
        {
        }

        /// <summary>
        /// 报文标识符
        /// </summary>
        public ushort PacketId { get; set; }

        /// <summary>
        /// EncodePacketIdVariableHeader
        /// </summary>
        /// <param name="buffer"></param>
        public override void Encode(IByteBuffer buffer)
        {
            var buf = Unpooled.Buffer();
            try
            {
                EncodePacketId(buf);

                FixedHeader.RemaingLength = buf.ReadableBytes;
                FixedHeader.WriteTo(buffer);
                buffer.WriteBytes(buf);
                buf = null;
            }
            finally
            {
                buf?.Release();
            }
        }

        /// <summary>
        /// DecodePacketIdVariableHeader
        /// </summary>
        /// <param name="buffer"></param>
        public override void Decode(IByteBuffer buffer)
        {
            int remainingLength = RemaingLength;
            DecodePacketId(buffer, ref remainingLength);
            FixedHeader.RemaingLength = remainingLength;
        }

        protected void EncodePacketId(IByteBuffer buffer)
        {
            if (Qos > MqttQos.AtMostOnce)
            {
                buffer.WriteUnsignedShort(PacketId);
            }
        }

        protected void DecodePacketId(IByteBuffer buffer, ref int remainingLength)
        {
            if (Qos > MqttQos.AtMostOnce)
            {
                PacketId = buffer.ReadUnsignedShort(ref remainingLength);
                if (PacketId == 0)
                    throw new DecoderException("[MQTT-2.3.1-1]");
            }
        }
    }

 

ConnectPacket: 发起连接包

    /// <summary>
    /// 发起连接
    /// </summary>
    internal sealed class ConnectPacket : Packet
    {
        public ConnectPacket()
            : base(PacketType.CONNECT)
        {
        }

        #region Variable header

        /// <summary>
        /// 协议名
        /// </summary>
        public string ProtocolName { get; } = "MQTT";
        /// <summary>
        /// 协议级别
        /// </summary>
        public byte ProtocolLevel { get; } = 0x04;
        /// <summary>
        /// 保持连接 
        /// </summary>
        public short KeepAlive { get; set; }

        #region Connect Flags
        /// <summary>
        /// 用户名标志
        /// </summary>
        public bool UsernameFlag { get; set; }
        /// <summary>
        /// 密码标志
        /// </summary>
        public bool PasswordFlag { get; set; }
        /// <summary>
        /// 遗嘱保留
        /// </summary>
        public bool WillRetain { get; set; }
        /// <summary>
        /// 遗嘱QoS
        /// </summary>
        public MqttQos WillQos { get; set; }
        /// <summary>
        /// 遗嘱标志
        /// </summary>
        public bool WillFlag { get; set; }
        /// <summary>
        /// 清理会话
        /// </summary>
        public bool CleanSession { get; set; }
        #endregion

        #endregion

        #region Payload

        /// <summary>
        /// 客户端标识符 Client Identifier
        /// </summary>
        public string ClientId { get; set; }
        /// <summary>
        /// 遗嘱主题 Will Topic
        /// </summary>
        public string WillTopic { get; set; }
        /// <summary>
        /// 遗嘱消息 Will Message
        /// </summary>
        public byte[] WillMessage { get; set; }
        /// <summary>
        /// 用户名 User Name
        /// </summary>
        public string UserName { get; set; }
        /// <summary>
        /// 密码 Password
        /// </summary>
        public string Password { get; set; }

        #endregion

        public override void Encode(IByteBuffer buffer)
        {
            var buf = Unpooled.Buffer();
            try
            {
                //variable header
                buf.WriteString(ProtocolName);        //byte 1 - 8
                buf.WriteByte(ProtocolLevel);         //byte 9

                //connect flags;                      //byte 10
                var flags = UsernameFlag.ToByte() << 7;
                flags |= PasswordFlag.ToByte() << 6;
                flags |= WillRetain.ToByte() << 5;
                flags |= ((byte)WillQos) << 3;
                flags |= WillFlag.ToByte() << 2;
                flags |= CleanSession.ToByte() << 1;
                buf.WriteByte((byte)flags);

                //keep alive
                buf.WriteShort(KeepAlive);            //byte 11 - 12

                //payload
                buf.WriteString(ClientId);
                if (WillFlag)
                {
                    buf.WriteString(WillTopic);
                    buf.WriteBytes(WillMessage);
                }
                if (UsernameFlag && PasswordFlag)
                {
                    buf.WriteString(UserName);
                    buf.WriteString(Password);
                }

                FixedHeader.RemaingLength = buf.ReadableBytes;
                FixedHeader.WriteTo(buffer);
                buffer.WriteBytes(buf);
            }
            finally
            {
                buf?.Release();
                buf = null;
            }
        }
    }

 

连接回执: ConnAckPacket

    /// <summary>
    /// 连接回执
    /// </summary>
    internal sealed class ConnAckPacket : Packet
    {
        public ConnAckPacket() : base (PacketType.CONNACK)
        {
        }

        /// <summary>
        /// 当前会话
        /// </summary>
        public bool SessionPresent { get; set; }
        /// <summary>
        /// 连接返回码
        /// </summary>
        public ConnectReturnCode ConnectReturnCode { get; set; }

        public override void Decode(IByteBuffer buffer)
        {
            SessionPresent = (buffer.ReadByte() & 0x01) == 1;
            ConnectReturnCode = (ConnectReturnCode)buffer.ReadByte();
        }
    }
View Code

相关文章: