【问题标题】:.NET Client - Waiting for an MQTT response before proceeding to the next request.NET 客户端 - 在继续下一个请求之前等待 MQTT 响应
【发布时间】:2016-07-26 09:39:13
【问题描述】:

我有一个循环内的 MQTT 调用,并且在每次迭代中,它应该返回来自订阅者的响应,以便我可以在发布后使​​用转发的值。但问题是我不知道该怎么做。

我希望你有一个想法,或者如果我没有正确实施它,请你指导我完成这个。谢谢。

这是我的代码:

// MyClientMgr
class MyClientMgr{

  public long CurrentOutput { get; set; }

  public void GetCurrentOutput(MyObjectParameters parameters, MqttClient client)
  {
      MyMessageObject msg = new MyMessageObject
      {
        Action = MyEnum.GetOutput,
        Data = JsonConvert.SerializeObject(parameters)
      }
      mq_GetCurrentOutput(msg, client);
  }

  private void mq_GetCurrentOutput(MyMessageObject msg, MqttClient client)
  {
      string msgStr = JsonConvert.SerializeObject(msg);
      client.Publish("getOutput", Encoding.UTF8.GetBytes(msgStr),   
MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
      client.MqttMsgPublishReceived += (sender, e) =>{
           MyObjectOutput output = JsonConvert.DeserializeObject<MyObjectOutput>(Encoding.UTF8.GetString(e.Message));
           CurrentOutput = output;
      };
  }  

}

// MyServerMgr
class MyServerMgr
{
   public void InitSubscriptions()
   {
      mq_GetOutput();
   }

   private void mq_GetOutput()
   {
       MqttClient clientSubscribe = new MqttClient(host);
       string clientId = Guid.NewGuid().ToString();
       clientSubscribe.Connect(clientId);
       clientSubscribe.Subscribe(new string[] { "getOutput" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });

       MqttClient clientPublish = new MqttClient(host);
       string clientIdPub = Guid.NewGuid().ToString();
       clientPublish.Connect(clientIdPub);
       clientSubscribe.MqttMsgPublishReceived += (sender, e) => {
            MyMessageObj msg = JsonConvert.DeserializeObject<MyMessageObj>(Encoding.UTF8.GetString(e.Message));

            var output = msg.Output;
            clientPublish.Publish("getOutput", Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(output)), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
       }
   }
}

// MyCallerClass
class MyCallerClass
{
   var host = "test.mqtt.org";
   var myServer = new MyServerMgr(host);
   var myClient = new MyClientMgr();

   myServer.InitSubscriptions();
   MqttClient client = new MqttClient(host);
   for(int i = 0; i < 10; i++)
   {
      long output = 0;

      MyObjectParameters parameters = {};
      myClient.GetCurrentOutput(parameters, client) // here I call the  method from my client manager 
      // to publish the getting of the output and assigned 
      // below for use,  but the problem is the value doesn't 
      // being passed to the output variable because it is not 
      // yet returned by the server.

      // Is there a way I could wait the process to 
      // get the response before assigning the output?

      output = myClient.CurrentOutput; // output here will always be null  
     // because the response is not yet forwarded by the server 

   }
}

我的调用者类中有一个循环来调用 mqtt 发布以获取输出,但我不知道如何在分配之前获取输出,我想先等待响应,然后再进行下一个。

我已经尝试过像这样在里面做一个while循环:

while(output == 0)
{
   output = myClient.CurrentOutput;
}

是的,我可以在这里得到输出,但它会大大减慢这个过程。有时它会失败。

请帮助我。谢谢。

【问题讨论】:

    标签: c# .net mqtt


    【解决方案1】:

    您似乎正在尝试通过异步协议 (MQTT) 进行同步通信。

    我的意思是你想发送一条消息然后等待响应,这不是 MQTT 的工作方式,因为在协议级别没有回复消息的概念。

    我对 C# 不是很熟悉,所以我只对可能的解决方案做一个抽象的描述。

    我的建议是使用发布线程,等待/脉冲(查看Monitor 类)在每次发布后拥有此块,并在收到响应时让消息处理程序调用脉冲。

    如果响应不包含识别原始请求的等待,您还需要一个状态机变量来记录正在进行的请求。

    如果另一端由于某些原因没有响应,您可能需要考虑暂停等待。

    【讨论】:

    • 在 C# 中,使用 TaskTaskCompletionSource 会更加地道/现代。
    【解决方案2】:

    您可以使用具有 WaitOne() 和 Set() 方法的 AutoResetEvent 类。在发布后使​​用 WaitOne() 将等待消息发布,并且在 client_MqttMsgPublishReceived 事件下使用 Set() 将在订阅者收到他订阅的消息时释放等待。

    【讨论】:

    • client_MqttMsgPublishReceived 并不表示订阅者已收到消息,仅表示代理已收到消息。 MQTT 中没有端到端交付通知
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-01-17
    • 1970-01-01
    • 1970-01-01
    • 2020-01-18
    • 2014-07-12
    相关资源
    最近更新 更多