【问题标题】:Retained MQTT messages being missed遗漏的保留的 MQTT 消息
【发布时间】:2021-10-11 13:53:58
【问题描述】:

我正在尝试创建一个简单的应用程序,该应用程序将使用 MQTT 将一些消息发布到一个主题(我正在使用的库是 M2Mqtt.net),然后我想在消息已经发送后订阅该主题,然后拥有它们都被接收然后丢弃,因为它们已经被接收了。

我正在使用 mosquitto 2.0.12 作为经纪人

这是发布者:

public class MessagePublisher : IMessagePublisher
{
    private readonly MqttClient _client;

    public MessagePublisher()
    {
        _client = new MqttClient("localhost");

        // clean session needs to be set to false so that it retains all the missed messages, not just the last one
        _client.Connect(Guid.NewGuid().ToString(), "username", "password", false, byte.MaxValue);
    }

    public void Publish(string topic, string message, bool retain = false)
    {
        Console.Write($"Sent: {topic}, {message}");

        _client.Publish(topic, Encoding.UTF8.GetBytes(message), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, retain);

        Total.SentAndReceived.Add(message, null);
    }
}

这是监听器:

public class MessageReceiver : IMessageReceiver
{
    private readonly MqttClient _client;

    public MessageReceiver()
    {
        _client = new MqttClient("localhost");
    }

    public void Subscribe(params string[] topics)
    {
        _client.Subscribe(topics, new[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });

        _client.MqttMsgPublishReceived += client_receivedMessage;
    }


    public void Connect()
    {
        // clean session needs to be set to false so that it retains all the missed messages, not just the last one
        _client.Connect(Guid.NewGuid().ToString(), "username", "password", false, byte.MaxValue);
    }

    public void Disconnect()
    {
        _client.Disconnect();
    }

    static void client_receivedMessage(object sender, MqttMsgPublishEventArgs e)
    {
        var message = Encoding.Default.GetString(e.Message);

        Console.WriteLine($"Message Received: {message}");

        if (Total.SentAndReceived.ContainsKey(message))
            Total.SentAndReceived[message] = message;
    }
}

这是主要的应用程序:

public static class Program
{
    public static void Main(string[] args)
    {
        var messageReceiver = new MessageReceiver();

        var publisher = new MessagePublisher();

        for (var i = 1; i <= 10000; i++)
        {
            publisher.Publish("Devices/", i.ToString(), true);
        }

        messageReceiver.Subscribe("Devices/");

        messageReceiver.Connect();

        Thread.Sleep(5000);

        var b = Total.SentAndReceived.Where(x => x.Value == null);

        Console.WriteLine($"{b.Count()} Missed Messages");
    }
}

我遇到的问题是有遗漏的消息。当我运行应用程序时,错过的消息数量总是会发生变化。并不是最后 n 条消息被遗漏,而是前 n 条消息。

我希望如果我要构建一个能够监听已发布消息的服务。如果服务因任何原因停止。一旦服务重新上线,就会收到在该停机时间内发送的消息。

【问题讨论】:

    标签: c# mqtt mosquitto retain


    【解决方案1】:

    我认为您对这里的某些术语有误解。

    首先,MQTT 通常不会对消息进行排队。代理将消息排队的唯一时间是接收客户端已经连接并订阅了 QOS > 0 的主题。如果该客户端在发布者发送消息之前断开连接,代理将排队消息。只有当它们使用相同的客户端 ID 重新连接并将干净会话标志设置为 false 时,它​​们才会被发送到接收客户端。这是消息排队的唯一方式。

    由于您似乎在使用随机生成的客户端 ID (Guid.NewGuid().ToString()),因此这将不起作用。您似乎也在尝试在连接之前订阅,这同样行不通。

    其次,如上所述,保留消息与消息队列无关。如果在发布时设置了保留标志,则保留消息。然后,代理将存储该特定消息并在客户端订阅匹配主题时交付它。此消息将在有关该主题的任何其他消息之前发送。如果发布了另一条带有保留标志的消息,它将替换之前的消息,每个主题只能有 1 条保留消息。

    【讨论】:

    • 这似乎已经修复了一切,谢谢。只是为了澄清。我使用了一个恒定的向导。已连接、订阅并收到初始消息。断开连接。然后发送了 10,000 条消息。重新连接并收到了。对吗?
    • 这可能对任何人都有帮助:mosquitto 配置文件有一个可能需要更改的 max_queued_messages 标志。默认为 1000。0 表示没有限制
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-09-06
    • 1970-01-01
    • 1970-01-01
    • 2022-10-07
    • 2018-03-14
    • 2018-09-13
    • 1970-01-01
    相关资源
    最近更新 更多