【问题标题】:MQTT PAHO - MessageId for confirmation of successful message deliveryMQTT PAHO - 用于确认消息传递成功的 MessageId
【发布时间】:2019-02-26 11:00:14
【问题描述】:

我使用 Java 中的 org.eclipse.paho.client.mqttv3 版本 1.2.0 开发了一个应用程序。识别通过 iMqttDeliveryToken 的 messageID 发送到 mqtt 代理的消息。

第 1 步 - 发布消息:

ObjectMapper objectMapper = new ObjectMapper();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(objectMapper.writeValueAsString(myObject).getBytes()); 
mqttMessage.setQos(1);
IMqttDeliveryToken iMqttDeliveryToken = this.client.publish("/myTopic", mqttMessage);

第 2 步 - 将消息保存到数据库中:

从 IMqttDeliveryToken 我得到了 messageID。我用它来保存和识别数据库中的消息。

第 3 步 - 等待调用 deliveryComplete 回调:

这为我提供了相同的 IMqttDeliveryToken,让我再次获得 messageId。

@Override
   public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
      // delete the database entry via messageId from database
}

问题在于 Step3 可以比 Step2 更快。所以在我的条目被保存到数据库之前调用回调。我需要在发送消息之前知道 messageId 以保存它,然后才能调用回调。我无法自己生成 messageId 并将其设置为:

mqttMessage.setId(555);

MQTT 生成一个自己的 messageId。所以我的问题:

  1. 是否可以设置自己的messageId?
  2. 能否在发布前获取mqtt客户端生成的messageId?

【问题讨论】:

    标签: mqtt paho


    【解决方案1】:

    不要使用 Paho 库生成的消息的 MQTT id - 因为它

    1. 交付太晚,无法满足您的需求
    2. 如果您发送大量消息,可能会重复发送。

    publishing:

    Long databaseId = 42;
    ObjectMapper objectMapper = new ObjectMapper();
    MqttMessage mqttMessage = new MqttMessage();
    mqttMessage.setPayload(objectMapper.writeValueAsString(myObject).getBytes()); 
    mqttMessage.setQos(1);
    this.client.publish("/myTopic", mqttMessage, databaseId, mPublishCallback);
    

    稍后您可以在发布回调方法中检索 id:

    private final IMqttActionListener mPublishCallback = new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken publishToken) {
            Long databaseId = (Long) publishToken.getUserContext();
        }
    
        @Override
        public void onFailure(IMqttToken publishToken, Throwable ex) {
            Long databaseId = (Long) publishToken.getUserContext();
        }
    };
    

    另外,您使用的是同步客户端吗?我更喜欢使用IMqttAsyncClient

    【讨论】:

    • 这种方法效果很好!我也在使用 IMqttAsyncClient。我真的需要一个新的 IMqttActionListener 实例吗?我刚刚发现必须将 IMqttActionListener 设置为参数的发布函数。我问的原因是,如果消息成功发送到 mqtt 代理,也会调用我从客户端设置的回调传递完成。
    • mPublishCallback 的方法将帮助您了解发布特定消息是失败(需要重试)还是成功
    • 我使用 QoS1 管理它并正确处理异常。所以我不需要发送一个ID来识别消息。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-15
    • 2021-02-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多