【发布时间】:2014-07-17 23:51:33
【问题描述】:
我正在使用 MQTT 在 Android 上实现诸如聊天功能之类的功能,并从 MQTT 中获取了可用的示例,并对其进行了一些更改,以便拥有一个侦听器。建立从 Android-Client 到 ActiveMQ-Brocker 的连接。我还有一个带有 @ResourceAdapter("activemq.rar") 和 MDB 的 JbossServer,以便在服务器端实现管理(在运行时创建新主题等)。
我有一个关于 Android-Client 中的 Listener 和 Publisher 的问题。侦听器和订阅已设置。它运作良好,但仅适用于第一条消息(两种方式:从客户端发送到主题和从 WebConsole 发送到主题)。 在接收到第二条消息时,Android 客户端在侦听器 onFailure 中发生错误:java.net.ProtocolException:来自服务器的命令包含无效的消息 id:2。当我尝试从客户端发送第二条消息时,也会发生同样的错误。 我附上了我的代码。 如果你能帮助我解决这个错误,我将不胜感激。
private void callBackConnect() {
mqtt = new MQTT();
mqtt.setClientId("android-mqtt-example");
try {
// mqtt.setHost("localhost", 1883);
// mqtt.setHost("tcp://10.0.0.62", 5445);
mqtt.setHost(sAddress);
Log.d(TAG, "Address set: " + sAddress);
} catch (URISyntaxException urise) {
Log.e(TAG, "URISyntaxException connecting to " + sAddress + " - "
+ urise);
}
if (sUserName != null && !sUserName.equals("")) {
mqtt.setUserName(sUserName);
Log.d(TAG, "UserName set: [" + sUserName + "]");
}
if (sPassword != null && !sPassword.equals("")) {
mqtt.setPassword(sPassword);
Log.d(TAG, "Password set: [" + sPassword + "]");
}
// futureConnection = mqtt.futureConnection();
progressDialogListener = ProgressDialog.show(this, "",
"setListener...", true);
progressDialogListener.setCanceledOnTouchOutside(true);
callBackConnection = mqtt.callbackConnection();
callBackConnection.listener(new Listener() {
public void onConnected() {
Log.i(TAG, "callBackconnect()->setListener:onConnected");
// new AlertDialog.Builder(MQTTActivity.this)
// .setMessage("Connected (listener)")
// .setNeutralButton("OK", null).show();
progressDialogListener.dismiss();
}
public void onDisconnected() {
Log.i(TAG, "callBackconnect()->setListener:onDisconnected");
// new AlertDialog.Builder(MQTTActivity.this)
// .setMessage("Disconnected (listener)")
// .setNeutralButton("OK", null).show();
progressDialogListener.dismiss();
}
public void onFailure(Throwable arg0) {
Log.i(TAG,
"callBackconnect()->setListener:onFailure:"
+ arg0.toString());
Log.e(TAG, arg0.toString());
// new AlertDialog.Builder(MQTTActivity.this)
// .setMessage("Failure (listener)")
// .setNeutralButton("OK", null).show();
progressDialogListener.dismiss();
}
public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) {
Log.i(TAG,
"callBackconnect()->setListener:onPublish(UTF8Buffer arg0, Buffer arg1, Runnable arg2)");
// new AlertDialog.Builder(MQTTActivity.this)
// .setMessage("Publish (listener)")
// .setNeutralButton("OK", null).show();
final String msgBody = msg.utf8().toString();
final String topicBody = topic.utf8().toString();
// Create runnable for posting
mHandler.post(new Runnable() {
public void run() {
updateReceiveETInUi(topicBody, msgBody);
}
});
Log.i(TAG,
"callBackconnect()->setListener:onPublish(UTF8Buffer arg0, Buffer arg1, Runnable arg2):"
+ topicBody + " : " + msgBody);
if (msgBody.startsWith("REPLY: ")) {
// Don't reply to your own reply
Log.i(TAG,
"callBackconnect()->setListener:onPublish-> msgBody.startsWith REPLY");
} else {
try {
byte[] reply = "REPLY: Hello Back".getBytes();
callBackConnection.publish(sDestination, reply,
QoS.AT_MOST_ONCE, true, null);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
// callBackConnection.resume();
progressDialogConn = ProgressDialog.show(this, "", "Connecting...",
true, true);
callBackConnection.connect(new Callback<Void>() {
public void onFailure(Throwable value) {
Log.i(TAG,
"callBackconnect()->connect->.onFailure : "
+ value.toString());
Log.e(TAG,
"callBackconnect()->connect->.onFailure : "
+ value.toString());
// new AlertDialog.Builder(MQTTActivity.this)
// .setMessage(value.toString())
// .setNeutralButton("OK", null).show();
progressDialogConn.dismiss();
}
public void onSuccess(Void v) {
Log.i(TAG, "callBackconnect()->connect->onSuccess(Void v): "
+ v.TYPE.toString());
// new
// AlertDialog.Builder(MQTTActivity.this).setMessage("Connected (callback)")
// .setNeutralButton("OK", null).show();
progressDialogConn.dismiss();
}
public void onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack) {
// You can now process a received message from a topic.
// Once process execute the ack runnable.
Log.i(TAG,
topic.toString()
+ "callBackconnect()->connect->.onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack)");
ack.run();
}
});
if (sDestination.equals("")) {
Log.i(TAG, "Destination must be provided");
} else {
callBackubSubcribe();
}
}
private void callBackubSubcribe() {
topics[0] = new Topic(sDestination, QoS.AT_LEAST_ONCE);
// Topic[] topics = { new Topic(sDestination, QoS.AT_LEAST_ONCE) };
Log.i(TAG, "topics[0]");
for (int i = 0; i < topics.length; i++) {
Log.i(TAG, "callBackubSubcribe topics: " + topics[i].toString());
}
progressDialogSub = ProgressDialog.show(this, "", "Subscribing...",
true);
progressDialogSub.setCanceledOnTouchOutside(true);
Log.i(TAG, "callBackubSubcribe()-> TRY to subscribe(topics)");
callBackConnection.subscribe(topics, new Callback<byte[]>() {
public void onFailure(Throwable value) {
Log.i(TAG,
"callBackubSubcribe->subscribe:onFailure:"
+ value.toString());
// new AlertDialog.Builder(MQTTActivity.this)
// .setMessage(value.toString())
// .setNeutralButton("OK", null).show();
progressDialogSub.dismiss();
}
public void onSuccess(Void v) {
Log.i(TAG, "callBackubSubcribe()->connect->.onSuccess");
// new AlertDialog.Builder(MQTTActivity.this)
// .setMessage("Connected (callback)")
// .setNeutralButton("OK", null).show();
progressDialogSub.dismiss();
}
@Override
public void onSuccess(byte[] arg0) {
Log.i(TAG,
"callBackubSubcribe()->connect->.onSuccess(byte[] arg0): "
+ arg0.getClass().toString());
progressDialogSub.dismiss();
}
});
}
干杯 亚历克斯
【问题讨论】:
标签: android client activemq publish-subscribe mqtt