【问题标题】:How to store data mqtt when offline and send them when online离线时如何存储数据mqtt,在线时如何发送
【发布时间】:2016-12-08 19:10:15
【问题描述】:

我有一个问题,当我的连接中断时,重新连接时mqtt发布不发送,如何解决?我关注this answer 但不工作

我做了什么

  • 我已经实现了一个服务 mqtt 来发送 gps 位置并在在线时照常工作。
  • 将 Qos 设置为 1。
  • 设置 ClientId 已修复。
  • 将发布 Qos 设置为 1。
  • 将干净会话设置为 false

但是当我重新连接时的结果仍然会在我在线时发布数据而不发布存储的持久性数据。

这是我的源代码:

package id.trustudio.android.mdm.service;

import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.content.SharedPreferences;
import android.content.pm.ApplicationInfo;
import android.content.pm.PackageManager;
import android.net.TrafficStats;
import android.os.Handler;
import android.os.IBinder;
import android.support.annotation.Nullable;
import android.util.Log;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.io.UnsupportedEncodingException;

import id.trustudio.android.mdm.http.DetectConnection;
import id.trustudio.android.mdm.util.Cons;
import id.trustudio.android.mdm.util.Debug;
import id.trustudio.android.mdm.util.GPSTracker;
import id.trustudio.android.mdm.util.GPSTracker2;

public class MqttService extends Service implements MqttCallback {

    public static boolean isStarted = false;

    private double latitude  = 0;
    private double longitude = 0;
    private GPSTracker mGPSTracker;
    private GPSTracker2 mGPSTracker2;

    boolean isInternetPresent = false;

    private SharedPreferences mPrivatePref;
    private SharedPreferences.Editor editor;

    private DetectConnection mDetectConnection;
    String deviceID,Name;
    int totalbyte;
    String packages;
    MemoryPersistence persistence;
    String clientId;
    MqttAndroidClient client;

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    @Override
    public void onCreate() {
        super.onCreate();

        mPrivatePref = this.getSharedPreferences(Cons.PRIVATE_PREF, Context.MODE_PRIVATE);
        editor = mPrivatePref.edit();

        deviceID = mPrivatePref.getString(Cons.APP_PACKAGE + "deviceid", "");
        Name = mPrivatePref.getString(Cons.APP_PACKAGE + "user", "");

        clientId = MqttClient.generateClientId();
        persistence = new MemoryPersistence();

        client =
                new MqttAndroidClient(getApplicationContext(), "tcp://broker.administrasi.id:1883",
                        clientId, persistence);

        client.setCallback(this);

        try{
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false);
            client.connect(connOpts,null, new IMqttActionListener() {

                        @Override
                        public void onSuccess(IMqttToken asyncActionToken) {

                        }

                        @Override
                        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {

                        }
                    });
        }catch (Exception e){
            e.printStackTrace();
        }

        mHandler.postDelayed(mUpdateTask, 1000);
    }


    public int onStartCommand(Intent intent, int flags, int startId) {

        int res = super.onStartCommand(intent, flags, startId);

        //check if your service is already started
        if (isStarted){      //yes - do nothing
            return Service.START_STICKY;
        } else {             //no
            isStarted = true;
        }

        return Service.START_STICKY;

    }

    private Handler mHandler = new Handler();
    private Runnable mUpdateTask = new Runnable() {
        public void run() {

            getLatLng();
            if (latitude == 0.0 || longitude == 0.0) getLatLngWifi();

                        Debug.e("MQTT","Connect");
                        String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID;
                        Debug.e("MQTT CLIENT", clientId);
                        int qos = 1;
                        try {
                            IMqttToken subToken = client.subscribe(topic, qos);
                            subToken.setActionCallback(new IMqttActionListener() {
                                @Override
                                public void onSuccess(IMqttToken asyncActionToken) {
                                    // The message was published

                                    String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID;
                                    long CurrentTime = System.currentTimeMillis();

                                    String payload = deviceID + "|" + latitude + "|" + longitude + "|" + CurrentTime;

                                    byte[] encodedPayload = new byte[0];
                                    try {
                                        encodedPayload = payload.getBytes("UTF-8");
                                        MqttMessage message = new MqttMessage(encodedPayload);
                                        client.publish(topic, message);
                                        message.setRetained(true);
                                        // set quality of service
                                        message.setQos(1);
                                        Log.d("TAG", "onSuccess");
                                    } catch (UnsupportedEncodingException | MqttException e) {
                                        e.printStackTrace();
                                    }
                                }

                                @Override
                                public void onFailure(IMqttToken asyncActionToken,
                                                      Throwable exception) {
                                    // The subscription could not be performed, maybe the user was not
                                    // authorized to subscribe on the specified topic e.g. using wildcards

                                }
                            });
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }

            mHandler.postDelayed(this, 20000);
        }
    };

    private void getLatLng() {
        mGPSTracker2        = new GPSTracker2(this);
        isInternetPresent   = mDetectConnection.isConnectingToInternet();
        if (isInternetPresent == true) {
            if (mGPSTracker2.canGetLocation()) {
                latitude    = mGPSTracker2.getLatitude();
                longitude   = mGPSTracker2.getLongitude();

                if(latitude != 0.0 && longitude != 0.0) {
                    editor.putString(Cons.APP_LATITUDE, latitude+"");
                    editor.putString(Cons.APP_LONGITUDE, longitude+"");
                    editor.commit();
                }
            } else {
//              getLatLngWifi();
                Debug.i(Cons.TAG, "on gps failed, please check");

            }
        } else {
            Debug.i(Cons.TAG, "no connection");

            if(mGPSTracker2 != null)
                mGPSTracker2.stopUsingGPS();
        }
    }

    private void getLatLngWifi() {
        mGPSTracker         = new GPSTracker(this);
        isInternetPresent   = mDetectConnection.isConnectingToInternet();
        if (isInternetPresent == true) {
            if (mGPSTracker.canGetLocation()) {
                latitude    = mGPSTracker.getLatitude();
                longitude   = mGPSTracker.getLongitude();

                if(latitude != 0.0 && longitude != 0.0) {
                    editor.putString(Cons.APP_LATITUDE, latitude+"");
                    editor.putString(Cons.APP_LONGITUDE, longitude+"");
                    editor.commit();
                }

            } else {
                Debug.i(Cons.TAG, "wifi " + "on gps failed, please check");

            }
        } else {
            Debug.i(Cons.TAG, "wifi " + "no connection");

            if(mGPSTracker != null)
                mGPSTracker.stopUsingGPS();
        }
    }

    @Override
    public void connectionLost(Throwable cause) {

    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {

    }
}

对不起我的英语不好

【问题讨论】:

  • 链接到问题的答案是如何将消息传递给已断开连接的客户端,而不是如何在断开连接的客户端重新连接时发送消息。如果您在客户端断开连接时尝试发布消息,它将引发异常,由您来捕获该异常并存储要在重新连接后重新发送的消息。
  • 另外,您在消息发布后设置 QOS 和保留标志,这将不起作用,因为消息已经消失。
  • 库中是否有任何回调可以做到这一点,或者我必须实现自己的脚本才能做到这一点?
  • 你必须自己实现
  • @hardillb 感谢您的建议,我将尝试自己实现它

标签: android mqtt


【解决方案1】:

如 cmets 中所述。

这是你必须自己编码的东西,不支持存储由于客户端在框架中断开连接而尚未发送的消息。 MQTT 持久性仅用于确保在 QOS 握手完成之前与代理的连接中断时,QOS 1/2 的消息不会在传输过程中丢失。

如果您尝试在断开连接时发布消息,client.publish(topic, message) 调用将引发异常,您需要捕获此异常,然后安排存储消息内容以供重新建立连接时使用。连接恢复并运行后,您需要遍历存储的详细信息并再次尝试发送。

【讨论】:

    【解决方案2】:

    所以这里是示例,作为 hardillb 回答我自己制作了将数据存储到本地数据库并在重新建立的连接处发送所有数据的工具。

    这是我的源代码

     private Handler mHandler = new Handler();
          private Runnable mUpdateTask = new Runnable() {
            public void run() {
    
                getLatLng();
                if (latitude == 0.0 || longitude == 0.0) getLatLngWifi();
    
                            Debug.e("MQTT","Connect");
                            String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID;
                            Debug.e("MQTT CLIENT", clientId);
                            int qos = 1;
                            try {
                                IMqttToken subToken = client.subscribe(topic, qos);
                                subToken.setActionCallback(new IMqttActionListener() {
                                    @Override
                                    public void onSuccess(IMqttToken asyncActionToken) {
                                        // The message was published
                                         mList = getLocationAll();//call all data stored on sqlite
                            Debug.e("MQTT","Connected. Size list = "+mList.size());
    
                            if(mList.size() > 0){//if data found then send in looping
                                        for (int i = 0; i < mList.size() ; i++) {
                                         final String Latitude = mList.get(i).latitude;
                                         final String Longitude = mList.get(i).longitude;
                                         final String timestamps = mList.get(i).CurrentTimes;
    
                                         String payload = deviceID + "|" + timestamps + "|" + Name + "|" + Latitude + "|" + Longitude;
    
                                    byte[] encodedPayload = new byte[0];
                                    try {
                                        encodedPayload = payload.getBytes("UTF-8");
                                        MqttMessage message = new MqttMessage(encodedPayload);
                                        // set quality of service
                                        client.publish(topic, message);
                                    } catch (UnsupportedEncodingException | MqttException e) {
                                        e.printStackTrace();
                                    }
                                    }
    
                                   DeleteAllLocation();
    
                                }
                                        String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID;
                                        long CurrentTime = System.currentTimeMillis();
    
                                        String payload = deviceID + "|" + latitude + "|" + longitude + "|" + CurrentTime;
    
                                        byte[] encodedPayload = new byte[0];
                                        try {
                                            encodedPayload = payload.getBytes("UTF-8");
                                            MqttMessage message = new MqttMessage(encodedPayload);
                                            client.publish(topic, message);
                                            message.setRetained(true);
                                            // set quality of service
                                            message.setQos(1);
                                            Log.d("TAG", "onSuccess");
                                        } catch (UnsupportedEncodingException | MqttException e) {
                                            e.printStackTrace();
                                        }
                                    }
    
                                    @Override
                                    public void onFailure(IMqttToken asyncActionToken,
                                                          Throwable exception) {
                                        // The subscription could not be performed, maybe the user was not
                                        // authorized to subscribe on the specified topic e.g. using wildcards
                                        long CurrentTime = System.currentTimeMillis();
                                        addLocation(deviceID, CurrentTime+"", Name,     latitude+"" , longitude+""); //add data to sqlite when offline
                                        Debug.e("MQTT","Failure");
    
                                    }
                                });
                            } catch (MqttException e) {
                                e.printStackTrace();
                            }
    
                mHandler.postDelayed(this, 20000);
            }
        };
    

    这里是我用来存储和删除数据的 sqlite

     public void addLocation( String device_id, String timestamp, String user_id, String latitude, String longitude) {
        if (sqLite == null) {
            Debug.i(Cons.TAG, "null database");
            return;
        }
    
        ContentValues values = new ContentValues();
    
        values.put("device_id", device_id);
        values.put("timestamp", timestamp);
        values.put("user_id", user_id);
        values.put("latitude", latitude);
        values.put("longitude", longitude);
    
        Debug.i(Cons.TAG, "Insert location : title = " + device_id);
    
        sqLite.insert("tbl_location", null, values);
    }
    
    public ArrayList<LocationModel> getLocationAll() {
        ArrayList<LocationModel> result = new ArrayList<LocationModel>();
        if (sqLite == null || result == null) {
            return result;
        }
    
        String sql  = "SELECT * FROM tbl_location ORDER BY timestamp ASC";
    
        Cursor c    = sqLite.rawQuery(sql, null);
    
        int device_id       = c.getColumnIndex("device_id");
        int timestamp       = c.getColumnIndex("timestamp");
        int userid          = c.getColumnIndex("user_id");
        int latitude        = c.getColumnIndex("latitude");
        int longitude           = c.getColumnIndex("longitude");
    
        if (c != null) {
            if (c.moveToFirst()) {
    
                while (c.isAfterLast() == false) {
                    LocationModel mApps = new LocationModel();
    
                    mApps.DeviceId      = c.getInt(device_id);
                    mApps.CurrentTimes  = c.getString(timestamp);
                    mApps.UserId        = c.getString(userid);
                    mApps.latitude      = c.getString(latitude);
                    mApps.longitude     = c.getString(longitude);
    
                    result.add(mApps);
    
                    c.moveToNext();
                }
    
                c.close();
            }
        }
    
        return result;
    }
    
    public void DeleteAllLocation() {
        if (sqLite == null)
            return;
    
        sqLite.delete("tbl_location", null, null);
    }
    

    【讨论】:

      猜你喜欢
      • 2015-06-02
      • 1970-01-01
      • 1970-01-01
      • 2015-12-02
      • 1970-01-01
      • 2015-05-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多