一、DotNetty背景介绍
某天发现 dotnet 是个好东西,就找了个项目来练练手。于是有了本文的 Mqtt 客户端 (github: MqttFx )
DotNetty是微软的Azure团队,使用C#实现的Netty的版本发布。不但使用了C#和.Net平台的技术特点,并且保留了Netty原来绝大部分的编程接口。让我们在使用时,完全可以依照Netty官方的教程来学习和使用DotNetty应用程序。
DotNetty同时也是开源的,它的源代码托管在Github上: https://github.com/azure/dotnetty
Netty 的官方文档 : http://netty.io/wiki/all-documents.html
二、Packet
DotNetty.Codecs.Mqtt, 本项目没有使用。直接写了一个。
FixedHeader: 固定报头
/// <summary>
/// 固定报头
/// </summary>
public class FixedHeader
{
/// <summary>
/// 报文类型
/// </summary>
public PacketType PacketType { get; set; }
/// <summary>
/// 重发标志
/// </summary>
public bool Dup { get; set; }
/// <summary>
/// 服务质量等级
/// </summary>
public MqttQos Qos { get; set; }
/// <summary>
/// 保留标志
/// </summary>
public bool Retain { get; set; }
/// <summary>
/// 剩余长度
/// </summary>
public int RemaingLength { internal get; set; }
public FixedHeader(PacketType packetType)
{
PacketType = packetType;
}
public FixedHeader(byte signature, int remainingLength)
{
PacketType = (PacketType)((signature & 0xf0) >> 4);
Dup = ((signature & 0x08) >> 3) > 0;
Qos = (MqttQos)((signature & 0x06) >> 1);
Retain = (signature & 0x01) > 0;
RemaingLength = remainingLength;
}
public void WriteTo(IByteBuffer buffer)
{
var flags = (byte)PacketType << 4;
flags |= Dup.ToByte() << 3;
flags |= (byte)Qos << 1;
flags |= Retain.ToByte();
buffer.WriteByte((byte)flags);
buffer.WriteBytes(EncodeLength(RemaingLength));
}
static byte[] EncodeLength(int length)
{
var result = new List<byte>();
do
{
var digit = (byte)(length % 0x80);
length /= 0x80;
if (length > 0)
digit |= 0x80;
result.Add(digit);
} while (length > 0);
return result.ToArray();
}
}
Packet: 消息基类
/// <summary>
/// 消息基类
/// </summary>
public abstract class Packet
{
#region FixedHeader
/// <summary>
/// 固定报头
/// </summary>
public FixedHeader FixedHeader { protected get; set; }
/// <summary>
/// 报文类型
/// </summary>
public PacketType PacketType => FixedHeader.PacketType;
/// <summary>
/// 重发标志
/// </summary>
public bool Dup => FixedHeader.Dup;
/// <summary>
/// 服务质量等级
/// </summary>
public MqttQos Qos => FixedHeader.Qos;
/// <summary>
/// 保留标志
/// </summary>
public bool Retain => FixedHeader.Retain;
/// <summary>
/// 剩余长度
/// </summary>
public int RemaingLength => FixedHeader.RemaingLength;
#endregion
public Packet(PacketType packetType) => FixedHeader = new FixedHeader(packetType);
public virtual void Encode(IByteBuffer buffer) { }
public virtual void Decode(IByteBuffer buffer) { }
}
PacketWithId: 消息基类(带ID)
/// <summary> /// 消息基类(带ID) /// </summary> public abstract class PacketWithId : Packet { public PacketWithId(PacketType packetType) : base(packetType) { } /// <summary> /// 报文标识符 /// </summary> public ushort PacketId { get; set; } /// <summary> /// EncodePacketIdVariableHeader /// </summary> /// <param name="buffer"></param> public override void Encode(IByteBuffer buffer) { var buf = Unpooled.Buffer(); try { EncodePacketId(buf); FixedHeader.RemaingLength = buf.ReadableBytes; FixedHeader.WriteTo(buffer); buffer.WriteBytes(buf); buf = null; } finally { buf?.Release(); } } /// <summary> /// DecodePacketIdVariableHeader /// </summary> /// <param name="buffer"></param> public override void Decode(IByteBuffer buffer) { int remainingLength = RemaingLength; DecodePacketId(buffer, ref remainingLength); FixedHeader.RemaingLength = remainingLength; } protected void EncodePacketId(IByteBuffer buffer) { if (Qos > MqttQos.AtMostOnce) { buffer.WriteUnsignedShort(PacketId); } } protected void DecodePacketId(IByteBuffer buffer, ref int remainingLength) { if (Qos > MqttQos.AtMostOnce) { PacketId = buffer.ReadUnsignedShort(ref remainingLength); if (PacketId == 0) throw new DecoderException("[MQTT-2.3.1-1]"); } } }
ConnectPacket: 发起连接包
/// <summary> /// 发起连接 /// </summary> internal sealed class ConnectPacket : Packet { public ConnectPacket() : base(PacketType.CONNECT) { } #region Variable header /// <summary> /// 协议名 /// </summary> public string ProtocolName { get; } = "MQTT"; /// <summary> /// 协议级别 /// </summary> public byte ProtocolLevel { get; } = 0x04; /// <summary> /// 保持连接 /// </summary> public short KeepAlive { get; set; } #region Connect Flags /// <summary> /// 用户名标志 /// </summary> public bool UsernameFlag { get; set; } /// <summary> /// 密码标志 /// </summary> public bool PasswordFlag { get; set; } /// <summary> /// 遗嘱保留 /// </summary> public bool WillRetain { get; set; } /// <summary> /// 遗嘱QoS /// </summary> public MqttQos WillQos { get; set; } /// <summary> /// 遗嘱标志 /// </summary> public bool WillFlag { get; set; } /// <summary> /// 清理会话 /// </summary> public bool CleanSession { get; set; } #endregion #endregion #region Payload /// <summary> /// 客户端标识符 Client Identifier /// </summary> public string ClientId { get; set; } /// <summary> /// 遗嘱主题 Will Topic /// </summary> public string WillTopic { get; set; } /// <summary> /// 遗嘱消息 Will Message /// </summary> public byte[] WillMessage { get; set; } /// <summary> /// 用户名 User Name /// </summary> public string UserName { get; set; } /// <summary> /// 密码 Password /// </summary> public string Password { get; set; } #endregion public override void Encode(IByteBuffer buffer) { var buf = Unpooled.Buffer(); try { //variable header buf.WriteString(ProtocolName); //byte 1 - 8 buf.WriteByte(ProtocolLevel); //byte 9 //connect flags; //byte 10 var flags = UsernameFlag.ToByte() << 7; flags |= PasswordFlag.ToByte() << 6; flags |= WillRetain.ToByte() << 5; flags |= ((byte)WillQos) << 3; flags |= WillFlag.ToByte() << 2; flags |= CleanSession.ToByte() << 1; buf.WriteByte((byte)flags); //keep alive buf.WriteShort(KeepAlive); //byte 11 - 12 //payload buf.WriteString(ClientId); if (WillFlag) { buf.WriteString(WillTopic); buf.WriteBytes(WillMessage); } if (UsernameFlag && PasswordFlag) { buf.WriteString(UserName); buf.WriteString(Password); } FixedHeader.RemaingLength = buf.ReadableBytes; FixedHeader.WriteTo(buffer); buffer.WriteBytes(buf); } finally { buf?.Release(); buf = null; } } }
连接回执: ConnAckPacket
/// <summary> /// 连接回执 /// </summary> internal sealed class ConnAckPacket : Packet { public ConnAckPacket() : base (PacketType.CONNACK) { } /// <summary> /// 当前会话 /// </summary> public bool SessionPresent { get; set; } /// <summary> /// 连接返回码 /// </summary> public ConnectReturnCode ConnectReturnCode { get; set; } public override void Decode(IByteBuffer buffer) { SessionPresent = (buffer.ReadByte() & 0x01) == 1; ConnectReturnCode = (ConnectReturnCode)buffer.ReadByte(); } }