【问题标题】:Command contained invalid message id: 2 after receiving of the first message in Android-Client命令包含无效的消息 id: 2 在 Android-Client 中收到第一条消息后
【发布时间】:2014-07-17 23:51:33
【问题描述】:

我正在使用 MQTT 在 Android 上实现诸如聊天功能之类的功能,并从 MQTT 中获取了可用的示例,并对其进行了一些更改,以便拥有一个侦听器。建立从 Android-Client 到 ActiveMQ-Brocker 的连接。我还有一个带有 @ResourceAdapter("activemq.rar") 和 MDB 的 JbossServer,以便在服务器端实现管理(在运行时创建新主题等)。

我有一个关于 Android-Client 中的 Listener 和 Publisher 的问题。侦听器和订阅已设置。它运作良好,但仅适用于第一条消息(两种方式:从客户端发送到主题和从 WebConsole 发送到主题)。 在接收到第二条消息时,Android 客户端在侦听器 onFailure 中发生错误:java.net.ProtocolException:来自服务器的命令包含无效的消息 id:2。当我尝试从客户端发送第二条消息时,也会发生同样的错误。 我附上了我的代码。 如果你能帮助我解决这个错误,我将不胜感激。

private void callBackConnect() {
    mqtt = new MQTT();
    mqtt.setClientId("android-mqtt-example");

    try {
        // mqtt.setHost("localhost", 1883);
        // mqtt.setHost("tcp://10.0.0.62", 5445);
        mqtt.setHost(sAddress);
        Log.d(TAG, "Address set: " + sAddress);
    } catch (URISyntaxException urise) {
        Log.e(TAG, "URISyntaxException connecting to " + sAddress + " - "
                + urise);
    }

    if (sUserName != null && !sUserName.equals("")) {
        mqtt.setUserName(sUserName);
        Log.d(TAG, "UserName set: [" + sUserName + "]");
    }

    if (sPassword != null && !sPassword.equals("")) {
        mqtt.setPassword(sPassword);
        Log.d(TAG, "Password set: [" + sPassword + "]");
    }

    // futureConnection = mqtt.futureConnection();
    progressDialogListener = ProgressDialog.show(this, "",
            "setListener...", true);
    progressDialogListener.setCanceledOnTouchOutside(true);

    callBackConnection = mqtt.callbackConnection();

    callBackConnection.listener(new Listener() {
        public void onConnected() {
            Log.i(TAG, "callBackconnect()->setListener:onConnected");
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage("Connected (listener)")
            // .setNeutralButton("OK", null).show();
            progressDialogListener.dismiss();
        }

        public void onDisconnected() {
            Log.i(TAG, "callBackconnect()->setListener:onDisconnected");
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage("Disconnected (listener)")
            // .setNeutralButton("OK", null).show();
            progressDialogListener.dismiss();
        }

        public void onFailure(Throwable arg0) {
            Log.i(TAG,
                    "callBackconnect()->setListener:onFailure:"
                            + arg0.toString());
            Log.e(TAG, arg0.toString());
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage("Failure (listener)")
            // .setNeutralButton("OK", null).show();
            progressDialogListener.dismiss();
        }

        public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) {
            Log.i(TAG,
                    "callBackconnect()->setListener:onPublish(UTF8Buffer arg0, Buffer arg1, Runnable arg2)");
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage("Publish (listener)")
            // .setNeutralButton("OK", null).show();

            final String msgBody = msg.utf8().toString();
            final String topicBody = topic.utf8().toString();
            // Create runnable for posting
            mHandler.post(new Runnable() {
                public void run() {
                    updateReceiveETInUi(topicBody, msgBody);
                }
            });

            Log.i(TAG,
                    "callBackconnect()->setListener:onPublish(UTF8Buffer arg0, Buffer arg1, Runnable arg2):"
                            + topicBody + " : " + msgBody);

            if (msgBody.startsWith("REPLY: ")) {
                // Don't reply to your own reply
                Log.i(TAG,
                        "callBackconnect()->setListener:onPublish-> msgBody.startsWith REPLY");
            } else {
                try {
                    byte[] reply = "REPLY: Hello Back".getBytes();
                    callBackConnection.publish(sDestination, reply,
                            QoS.AT_MOST_ONCE, true, null);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    });

    // callBackConnection.resume();
    progressDialogConn = ProgressDialog.show(this, "", "Connecting...",
            true, true);
    callBackConnection.connect(new Callback<Void>() {
        public void onFailure(Throwable value) {
            Log.i(TAG,
                    "callBackconnect()->connect->.onFailure : "
                            + value.toString());
            Log.e(TAG,
                    "callBackconnect()->connect->.onFailure : "
                            + value.toString());
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage(value.toString())
            // .setNeutralButton("OK", null).show();
            progressDialogConn.dismiss();
        }
        public void onSuccess(Void v) {
            Log.i(TAG, "callBackconnect()->connect->onSuccess(Void v): "
                    + v.TYPE.toString());
            // new
            // AlertDialog.Builder(MQTTActivity.this).setMessage("Connected (callback)")
            // .setNeutralButton("OK", null).show();
            progressDialogConn.dismiss();
        }
        public void onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack) {
            // You can now process a received message from a topic.
            // Once process execute the ack runnable.
            Log.i(TAG,
                    topic.toString()
                            + "callBackconnect()->connect->.onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack)");
            ack.run();
        }

    });


    if (sDestination.equals("")) {
        Log.i(TAG, "Destination must be provided");
    } else {
        callBackubSubcribe();

    }
}

private void callBackubSubcribe() {

    topics[0] = new Topic(sDestination, QoS.AT_LEAST_ONCE);
    // Topic[] topics = { new Topic(sDestination, QoS.AT_LEAST_ONCE) };
    Log.i(TAG, "topics[0]");

    for (int i = 0; i < topics.length; i++) {
        Log.i(TAG, "callBackubSubcribe topics: " + topics[i].toString());
    }

    progressDialogSub = ProgressDialog.show(this, "", "Subscribing...",
            true);
    progressDialogSub.setCanceledOnTouchOutside(true);

    Log.i(TAG, "callBackubSubcribe()-> TRY to subscribe(topics)");
    callBackConnection.subscribe(topics, new Callback<byte[]>() {
        public void onFailure(Throwable value) {
            Log.i(TAG,
                    "callBackubSubcribe->subscribe:onFailure:"
                            + value.toString());
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage(value.toString())
            // .setNeutralButton("OK", null).show();
            progressDialogSub.dismiss();
        }

        public void onSuccess(Void v) {
            Log.i(TAG, "callBackubSubcribe()->connect->.onSuccess");
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage("Connected (callback)")
            // .setNeutralButton("OK", null).show();
            progressDialogSub.dismiss();
        }

        @Override
        public void onSuccess(byte[] arg0) {
            Log.i(TAG,
                    "callBackubSubcribe()->connect->.onSuccess(byte[] arg0): "
                            + arg0.getClass().toString());
            progressDialogSub.dismiss();
        }
    });
}

干杯 亚历克斯

【问题讨论】:

    标签: android client activemq publish-subscribe mqtt


    【解决方案1】:

    我从事 mqtt 服务已经有一段时间了,但是将您的代码与 mine 进行比较(假设您使用的是 fusesource's mqtt client library),我注意到一些看起来有点奇怪的事情(可能是因为我迷失了身份)。

    1) 您的连接处理程序有一个onSuccess,它假定接收来自主题的消息

       public void onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack) {
    

    我认为你应该只使用你在上面实现的常规 void onSuccess。

    2) 然后,我看到您在 Listener 处的 public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) 没有调用 ack ack.run();,实际上您在上面提到的 onSuccess 上调用了 ack.run() 我怀疑这可能是源的问题。

    顺便说一句,我认为最好将 mqtt 作为服务运行,与 Activity 来回交换消息,并在 Activity 代码中执行与 GUI 相关的操作。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-12-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-10-30
      • 2017-09-17
      • 2020-05-12
      • 1970-01-01
      相关资源
      最近更新 更多