【发布时间】:2021-10-11 13:53:58
【问题描述】:
我正在尝试创建一个简单的应用程序,该应用程序将使用 MQTT 将一些消息发布到一个主题(我正在使用的库是 M2Mqtt.net),然后我想在消息已经发送后订阅该主题,然后拥有它们都被接收然后丢弃,因为它们已经被接收了。
我正在使用 mosquitto 2.0.12 作为经纪人
这是发布者:
public class MessagePublisher : IMessagePublisher
{
private readonly MqttClient _client;
public MessagePublisher()
{
_client = new MqttClient("localhost");
// clean session needs to be set to false so that it retains all the missed messages, not just the last one
_client.Connect(Guid.NewGuid().ToString(), "username", "password", false, byte.MaxValue);
}
public void Publish(string topic, string message, bool retain = false)
{
Console.Write($"Sent: {topic}, {message}");
_client.Publish(topic, Encoding.UTF8.GetBytes(message), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, retain);
Total.SentAndReceived.Add(message, null);
}
}
这是监听器:
public class MessageReceiver : IMessageReceiver
{
private readonly MqttClient _client;
public MessageReceiver()
{
_client = new MqttClient("localhost");
}
public void Subscribe(params string[] topics)
{
_client.Subscribe(topics, new[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
_client.MqttMsgPublishReceived += client_receivedMessage;
}
public void Connect()
{
// clean session needs to be set to false so that it retains all the missed messages, not just the last one
_client.Connect(Guid.NewGuid().ToString(), "username", "password", false, byte.MaxValue);
}
public void Disconnect()
{
_client.Disconnect();
}
static void client_receivedMessage(object sender, MqttMsgPublishEventArgs e)
{
var message = Encoding.Default.GetString(e.Message);
Console.WriteLine($"Message Received: {message}");
if (Total.SentAndReceived.ContainsKey(message))
Total.SentAndReceived[message] = message;
}
}
这是主要的应用程序:
public static class Program
{
public static void Main(string[] args)
{
var messageReceiver = new MessageReceiver();
var publisher = new MessagePublisher();
for (var i = 1; i <= 10000; i++)
{
publisher.Publish("Devices/", i.ToString(), true);
}
messageReceiver.Subscribe("Devices/");
messageReceiver.Connect();
Thread.Sleep(5000);
var b = Total.SentAndReceived.Where(x => x.Value == null);
Console.WriteLine($"{b.Count()} Missed Messages");
}
}
我遇到的问题是有遗漏的消息。当我运行应用程序时,错过的消息数量总是会发生变化。并不是最后 n 条消息被遗漏,而是前 n 条消息。
我希望如果我要构建一个能够监听已发布消息的服务。如果服务因任何原因停止。一旦服务重新上线,就会收到在该停机时间内发送的消息。
【问题讨论】: