【发布时间】:2019-06-14 20:41:43
【问题描述】:
在我的一个 Scala 应用程序中处理字符串时遇到了一个非常奇怪的错误,我无法解释!我所拥有的是一个看起来像这样的观察者:
class MqttObserver[String] extends Observer[String] {
private val logger = Logger.of[MqttObserver[String]]
override def onNext(elem: String): Future[Ack] = {
logger.info(s"Got a message from Mqtt broker $elem")
Continue
}
override def onError(ex: Throwable): Unit = {
logger.error(s"Stream error happened ${ex.getMessage}")
}
override def onComplete(): Unit = {
logger.info(s"Stream ended")
}
}
然后我从
调用 onNext(elem: String)这是错误:
[error] found : java.lang.String
[error] required: String(in class MqttObservable)
[error] mqttObserver.onNext(message.getPayload.toString)
这就是我打电话的方式!
val callback: MqttCallback = new MqttCallback {
override def connectionLost(cause: Throwable): Unit = {
logger.info(cause)
}
override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
}
override def messageArrived(topic: Predef.String, message: MqttMessage): Unit = {
mqttObserver.onNext(message.getPayload.toString) // FAILS HERE!!!!
logger.info("Using Default Console Callback --> Receiving Data, Topic : %s, Message : %s".format(topic, message))
}
}
【问题讨论】:
-
那么
mqttObserver是什么?你是如何实例化它的?我不想只重新打开它,这样它就可以因为缺少 minimal reproducible example 而重新关闭。