【问题标题】:How to (RxJava) properly set conditions on which handler will process data, emitted from Observable?如何(RxJava)正确设置处理程序将处理从 Observable 发出的数据的条件?
【发布时间】:2017-03-12 13:03:56
【问题描述】:

所以,我的任务是在设备的位置信息发生变化时将其推送到远程服务器 Json API 服务。如果远程服务器不可用,我的 DatabaseManager 必须将它们保存到本地数据库。

这是我的改造 API:

    public interface GpsService { 
        @POST("/v1/savelocationbatch") 
        SaveResponse saveLocationBatch(@Body LocationBatch locationBatch); 
    }

    Retrofit retrofit = new Retrofit.Builder()
        .baseUrl(myBaseUrl)
        .addConverterFactory(GsonConverterFactory.create())
        .build();

    GpsService service = retrofit.create(GpsService.class);

还有一个 POJO 类:

    public class LocationBatch{

        @SerializedName("LocationPointList")
        ArrayList<LocationPoint> locationPointList;
        @SerializedName("ClientId")
        String clientId;
        @SerializedName("ClientSecret")
        String clientSecret;

        //setter & getter
    }

我的 LocationPoint 模型:

    @Table(name="LocationPoints", id = "_id")
    public class LocationPoint extends Model {

        @SerializedName("Latitude")
        @Column(name="latitude")
        public Double latitude;

        @SerializedName("Longitude")
        @Column(name="longitude")
        public Double longitude;

        @SerializedName("Altitude")
        @Column(name="altitude")
        public Double altitude;

        //... setters, getters etc
}

我最后的所有位置都存储在 CurrentLocationHolder 单例中(用于批量发送/保存到 DB/从 Observable 发射)。它的 setLocation() 方法更新 currentLocation 变量,然后将其放入 locationBuffer,然后检查缓冲区的大小,如果缓冲区的大小超过我的 MAX_BUFFER_SIZE 变量,它会触发 locationBufferChanged.onNext(使用 locationBuffer 的副本作为参数),然后清除位置缓冲区...

    public class CurrentLocationHolder {

        private List<LocationPoint> locationBuffer = 
                            Collections.synchronizedList(new ArrayList<>());
        private LocationPoint currentLocation;
        private final PublishSubject<List<LocationPoint>> locationBufferFull = 
                            PublishSubject.create();

        public Observable<List<LocationPoint>> 
                            observeLocationBufferFull(boolean emitCurrentValue) {
                return emitCurrentValue ?
                    locationBufferFull.startWith(locationBuffer) :
                    locationBufferFull;
            }

        public void setLocation(LocationPoint point) {
            this.currentLocation = point;
            locationBuffer.add(point);
            if (locationBuffer.size() >= MAX_BUFFER_SIZE) {
                locationBufferChanged.onNext(new ArrayList<>(this.locationBuffer));
            }
            locationBuffer.clear();
        }
    }

这是我的数据库管理器:

    public class DatabaseManager {    
        private Subscription locationBufferSubscription;
        private static DatabaseManager instance;    
        public static void InitInstance() {
            if (instance == null) 
                instance = new DatabaseManager();
            }
        }

        public void saveToDb(ArrayList<LocationPoint> locArray){
            ActiveAndroid.beginTransaction();
            try {
                for (int i = 0; i < locArray.size(); i++) {
                    locArray.get(i).save();
                }
                ActiveAndroid.setTransactionSuccessful();
            } 
            finally {
                ActiveAndroid.endTransaction();
            }
        }
    }

我的应用程序的主要目标:

通过 Retrofit 将所有监听到的 LocationPoints 写入 HTTP 服务器。如果远程服务器由于某种原因突然关闭(或 Internet 连接丢失),我的应用程序应该将新的 locationPoints 无缝写入本地数据库。当服务器(或互联网)启动时,一些机制应该为 Retrofit 的调用提供保存的本地数据。

所以,我的问题是:

  1. 如何创建一个 Rx-Observable 对象,它会正常向 Retrofit 服务发出 List,但是当服务器(或互联网)出现故障时,它应该向 DatabaseManager.saveToDb() 方法提供未保存的 LocationPoints?
  2. 如何捕捉互联网连接或服务器“启动”状态?创建一些 Observable 是否是个好主意,它将 ping 我的远程服务器,因此应该向它的订阅者发出一些布尔值?实现此行为的最佳方式是什么?
  3. 当 Internet 连接(服务器)变为“正常”时,是否有一种简单的方法可以使用本地保存的数据(来自本地 DB)将 Retrofit 调用加入队列?
  4. 如何不丢失服务器端的任何 LocationPoints? (最后我的客户端应用程序必须全部发送!
  5. 我做错了吗?我是 Android、Java 和
    的新手 特别是对于 RxJava...

【问题讨论】:

    标签: android retrofit rx-java rx-android


    【解决方案1】:

    有趣的任务!首先:您不需要创建数据库来存储如此微小的信息。 Android 有存储任何Serializable 数据的好地方。 因此,对于保存本地数据箱模型,如:

    public class MyLocation implements Serializable {
    
       @Nonnull
       private final String id;
       private final Location location;
       private final boolean isSynced;
    
       // constructor...
       // getters...
    }
    

    单例类:

    public class UserPreferences {
        private static final String LOCATIONS = "locations";
        @Nonnull
        private final SharedPreferences preferences;
        @Nonnull
        private final Gson gson;
        private final PublishSubject<Object> locationRefresh = PublishSubject.create();
    
    public void addLocation(MyLocation location) {
        final String json = preferences.getString(LOCATIONS, null);
    
        final Type type = new TypeToken<List<MyLocation>>() {
        }.getType();
    
        final List<MyLocation> list;
        if (!Strings.isNullOrEmpty(json)) {
            list = gson.fromJson(json, type);
        } else {
            list = new ArrayList<MyLocation>();
        }
        list.add(lication);
    
        final String newJson = gson.toJson(set);
        preferences.edit().putString(LOCATIONS, newJson).commit();
        locationRefresh.onNext(null);
    }
    
    private List<String> getLocations() {
        final String json = preferences.getString(LOCATIONS, null);
    
        final Type type = new TypeToken<List<MyLocation>>() {
        }.getType();
    
        final List<MyLocation> list = new ArrayList<MyLocation>();
        if (!Strings.isNullOrEmpty(json)) {
            list.addAll(gson.<List<MyLocation>>fromJson(json, type));
        }
    
        return list;
    }
    
    @Nonnull
    public Observable<List<MyLocation>> getLocationsObservable() {
        return Observable
                .defer(new Func0<Observable<List<MyLocation>>>() {
                    @Override
                    public Observable<List<MyLocation>> call() {
                        return Observable.just(getLocations())
                                .filter(Functions1.isNotNull());
                    }
                })
                .compose(MoreOperators.<List<MyLocation>>refresh(locationRefresh));
    }
    
    // also You need to create getLocationsObservable() and getLocations() methods but only for not synced Locations.
    
    }
    

    变化:

    public interface GpsService { 
        @POST("/v1/savelocationbatch") 
        Observable<SaveResponse> saveLocationBatch(@Body LocationBatch locationBatch); 
    }
    

    现在最有趣的......让它一切正常。 RxJava 有 extention。它有很多“很酷的工具”(顺便说一句,MoreOperators 在 UserPref 类中从那里),它也有处理改造错误的东西。

    所以假设当 Observable saveLocationObservable 发出一些东西时会发生位置保存。在这种情况下,您的代码如下所示:

    final Observable<ResponseOrError<SaveResponse>> responseOrErrorObservable = saveLocationObservable
            .flatMap(new Func1<MyLocation, Observable<ResponseOrError<SaveResponse>>>() {
                @Override
                public Observable<ResponseOrError<SaveResponse>> call(MyLocation myLocation) {
                    final LocationBatch locationBatch = LocationBatch.fromMyLocation(myLocation);  // some method to convert local location to requesr one
                    return saveLocationBatch(locationBatch)
                            .observeOn(uiScheduler)
                            .subscribeOn(networkScheduler)
                            .compose(ResponseOrError.<SaveResponse>toResponseOrErrorObservable());
                }
    
            })
            .replay(1)
            .refCount();
    final Observable<Throwable> error = responseOrErrorObservable
         .compose(ResponseOrError.<SaveResponse>onlyError())
         .withLatestFrom(saveLocationObservable, Functions2.<MyLocation>secondParam())
         .subscribe(new Action1<MyLocation>() {
                @Override
                public void call(MyLocation myLocation) {
                       // save location to UserPref with flag isSynced=flase
                }
            });
    final Observable<UserInfoResponse> success = responseOrErrorObservable
         .compose(ResponseOrError.<SaveResponse>onlySuccess())
         .subscribe(new Action1<SaveResponse>() {
                @Override
                public void call(SaveResponse response) {
                    // save location to UserPref with flag isSynced=true
                }
            });
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-11-07
      • 1970-01-01
      • 1970-01-01
      • 2012-03-26
      相关资源
      最近更新 更多