【问题标题】:How to publish data to using MQTT如何使用 MQTT 发布数据
【发布时间】:2018-01-11 14:41:19
【问题描述】:

我使用this docker 镜像来安装 Mosquitto MQTT。 现在它正在运行并在终端中显示以下消息:

1515680808: mosquitto version 1.4.14 (build date Mon, 10 Jul 2017 23:48:43 +0100) starting
1515680808: Config loaded from /mqtt/config/mosquitto.conf.
1515680808: Opening websockets listen socket on port 9001.
1515680808: Opening ipv4 listen socket on port 1883.
1515680808: Opening ipv6 listen socket on port 1883.

然后我创建了一个简单的 Maven 项目:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-mqtt_2.11</artifactId>
    <version>1.6.3</version>
</dependency>

我尝试使用下面显示的代码将一些数据发布到主题。我指向localhost:1883 作为MqttBrokerUrl 和一个主题test。但是,我收到此错误:

线程“main”中的异常 java.lang.NullPointerException at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457) 在 org.eclipse.paho.client.mqttv3.MqttAsyncClient.(MqttAsyncClient.java:273) 在 org.eclipse.paho.client.mqttv3.MqttAsyncClient.(MqttAsyncClient.java:167) 在 org.eclipse.paho.client.mqttv3.MqttClient.(MqttClient.java:224) 在 org.test.MQTTPublisher$.main(MQTTPublisher.scala:37) 在 org.test.MQTTPublisher.main(MQTTPublisher.scala)

代码:

package org.test

import org.apache.log4j.{Level, Logger}
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.mqtt._
import org.apache.spark.SparkConf

object MQTTPublisher {

  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
      System.exit(1)
    }

    // Set logging level if log4j not configured (override by adding log4j.properties to classpath)
    if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {
      Logger.getRootLogger.setLevel(Level.WARN)
    }

    val Seq(brokerUrl, topic) = args.toSeq

    var client: MqttClient = null

    try {
      val persistence = new MemoryPersistence()
      client = new MqttClient("localhost:1883", MqttClient.generateClientId(), persistence)

      client.connect()

      val msgtopic = client.getTopic(topic)
      val msgContent = "test test test"
      val message = new MqttMessage(msgContent.getBytes("utf-8"))

      while (true) {
        try {
          msgtopic.publish(message)
          println(s"Published data. topic: ${msgtopic.getName()}; Message: $message")
        } catch {
          case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
            Thread.sleep(10)
            println("Queue is full, wait for to consume data from the message queue")
        }
      }
    } catch {
      case e: MqttException => println("Exception Caught: " + e)
    } finally {
      if (client != null) {
        client.disconnect()
      }
    }
  }
}

【问题讨论】:

    标签: scala maven apache-spark mqtt mosquitto


    【解决方案1】:

    MqttClient() 构造函数接受一个 URI。

    您提供的只是主机名和端口号 (localhost:1883),它缺少一个应该是 tcp:// 的协议部分(这是库所期望的并返回 null。这确实应该抛出一个更好的错误。)

    你需要把线改成

    client = new MqttClient("tcp://localhost:1883", MqttClient.generateClientId(), persistence);
    

    【讨论】:

    • 非常感谢!
    • 我们如何提及 ipv6 URI?
    • 就像使用任何其他 URI tcp://[::1]:1883
    【解决方案2】:

    我认为您提供了错误的 Url,即您没有指定它必须连接的协议,这是我的直觉。

    尝试将网址更改为:

    tcp://localhost:1883

    我认为它会工作!休息对我来说似乎很好。

    有关工作示例,请参阅:https://github.com/shiv4nsh/scala-mqtt-client-rasberrypi-starter-kit/blob/master/src/main/scala/com/knoldus/MQTTPublisher.scala

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-04
      相关资源
      最近更新 更多