【问题标题】:mqtt async wait for messagse then respond to http post requestmqtt异步等待messagse然后响应http post请求
【发布时间】:2018-04-23 16:50:24
【问题描述】:

我是 node.js 的新手,我正在尝试制作一个 webhook,我在其中收到一个 HTTP 发布请求,并希望通过 mqtt 发送请求并等待 mqtt 消息参考 MQTT 消息,然后将这些消息发送一个响应通过 HTTP

  var array = [];
  const client = mqtt.connect(MQTTServer)
  var count =0;
  client.on('message', (topic, message) => {

  array[count] = message 
  count ++ 

  }

app.post('/tesr', function (request, response) {

    client.publish ('outTopic' , 'request ');
    client.subscribe('inTopic')


    //wait for multiple mqtt message in  MQTT callback 


    //after all messages received or timeout  return  here
     client.unsubscribe('inTopic')
    count = 0
    response.status(200).json(array);

  }

所以尝试了while()seInterval(),但没有找到任何解决方案

【问题讨论】:

  • 您需要为下一条inTopic 消息和await 创建一个promise。您可以循环执行。
  • 如何让client.on()回调函数在消息到达时触发,然后在收到所有x条消息时触发,否则超时
  • @saniljhaveri 您是否设法为多个请求执行此操作?

标签: javascript node.js http async-await mqtt


【解决方案1】:

您无需在路由处理程序中调用response.send(array),您可以在外部调用。

var array = [];
var resp;
var n = 10; //number of messages to wait for
var timeOutValue = 5000; //wait 5 seconds
var timer;

const client = mqtt.connect(MQTTServer)
var count =0;
client.on('message', (topic, message) => {
  array.push(message); 
  count ++ 
  if (count == n) {
     resp.send(array);
     client.unsubscribe('inTopic');
     resp = undefined;
     counter = 0;
     array = [];
     clearTimeout(timer)
  }
}

app.post('/test', function (request, response) {

resp = response;
client.publish ('outTopic' , 'request ');
client.subscribe('inTopic');

  timer = setTimeout(function(){
    if (resp) {
        resp.send(array);
        resp = undefined;
        client.unsubscribe('inTopic');
        counter = 0;
        array = []
    }
  }, timeOutValue);

}

它并不漂亮(而且一次调用一个......)但它应该可以工作。

【讨论】:

  • 删除接受标志的任何特殊原因?
  • 我很高兴您在代码为setTimeout() 时询问使用您的解决方案,它不会异步收集任何回调 MQTT 消息,因此它在响应中不返回任何内容,并带有“忽略已完成的异常”,我能够通过使用 async-await 来实现它,但我必须将代码部署在运行 node.js 版本 6.11.5 的 firebase 函数上,它不支持 async-await 所以想找出一种方法来做到这一点promise() 但这里 HTTP 会话已完成,而无需像以前一样等待 promise 准备好
  • 如果您想在超时时返回收到的消息的简短列表,只需编辑超时函数中的代码以返回它们而不是错误
  • 我已将答案编辑为在超时时不引发和错误,而是发送接收到的消息数。
  • 一次只能处理一个HTTP请求(因为只有一个全局resp对象)
【解决方案2】:

您可以定义一个全局变量(例如 global.results={} 并在您的 client.on('message', callback) 中使用即将到来的消息更新值。

  client.on('message', (topic, mqttMessage) => {
    // mqttMessage is Buffer
    global.results.receiveResult = mqttMessage.toString();
    client.end();
  });

现在,在您的 API 中,您可以使用 settimeout 根据 global.results 的值返回适当的响应:

  setTimeout(() => {
    console.log('printing results', global.results);
    if (global.results.publishResult !== 'Published Successfully') {
      return res
        .status(500)
        .send({ data: global.results, message: 'Publish Failed' });
    }
    if (!global.results.receiveResult) {
      return res
        .status(500)
        .send({ data: global.results, message: 'Published but Confirmation Not received' });
    }
    return res
      .status(200)
      .send({ data: global.results, message: 'Response from mqtt' });
  }, timeout);

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-11-13
    • 2018-06-03
    • 2015-07-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-30
    • 1970-01-01
    相关资源
    最近更新 更多