【问题标题】:Observable that emit items on demandObservable 按需发射项目
【发布时间】:2017-03-29 06:54:41
【问题描述】:

我想创建一个可按需发出项目的 Observable,这意味着我想要一次订阅 Observable 并根据我的请求通知 Observable 我需要新项目。

这是我使用 PublishSubject 所做的:

public class RecognizeSubject {

PublishSubject<Bitmap> mSubject;

private Context mContext;
private FaceDetector mFaceDetecor;

public RecognizeSubject(Context mContext) {
    this.mContext = mContext;
    this.mSubject = PublishSubject.create();
}

public void detect(Bitmap btm){
    mSubject.onNext(btm);
}

public Flowable<SinglePhotoId> execute() {
    return mSubject.toFlowable(BackpressureStrategy.DROP)
            .observeOn(Schedulers.newThread())
            .map(bitmap1 -> recognize(bitmap1))
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe(disposable -> initialize())
            .doFinally(() -> release());
}


private void initialize() {
    mFaceDetecor = new FaceDetector.Builder(mContext)
            .setTrackingEnabled(false)
            .setLandmarkType(FaceDetector.ALL_LANDMARKS)
            .build();
}

private void release() {
    if (mFaceDetecor != null)
        mFaceDetecor.release();
}

private SinglePhotoId recognize(Bitmap bitmap) {
    //SystemClock.sleep(3000);
   //make hard background work and return SinglePhotoId object
}

}

这是 Activity 类中的一个用法:

private void takeSubjectSnap() {
    if (mSubject == null)
        mSubject = new RecognizeSubject(getBaseContext());

    if (mDisposable == null || mDisposable.isDisposed()) {
        mDisposable = mSubject.execute()
                .subscribe(this::handleDetectionSuccess,
                        this::handleDetectionError,
                        this::handleDetectionCompleted);
    }

    mSnapshotButton.setProgress(true);
    mSubject.detect(myVideoView.getBitmap());
}

所以基本上我订阅了 Flowable 对象并将 Bitmap 对象传递给我的 Subject 类以继续并通过 Flowable 返回结果,该解决方案是正确的还是会产生一些内存泄漏?

有没有更好的解决方案可以通过标准的 onNext() 方法将对象发送到 Observable 以继续并返回结果?

【问题讨论】:

    标签: java android rx-java observable rx-android


    【解决方案1】:

    RxRelays 在这种情况下可以派上用场

    1. 创建一个 PublishRelay 主题。
    2. 订阅它。
    3. 并使用 publishRelaySubject.call(your_object); 传递数据

    https://github.com/JakeWharton/RxRelay

    【讨论】:

      【解决方案2】:

      (我会将此作为评论,因为这不是答案,但它太长了)

      我不确定您在这里的用例是什么,以及您想要实现的具体目标是什么,因为您描述和实现的内容略有不同。

      您所描述的是一种能够处理某些内容并在空闲/能够处理/等时请求新内容的机制

      您实现的是一种基于推送的处理器根据从客户端接收的数据处理项目的机制。

      因此,如果您实现的东西可以按您的意愿工作,那很好,但我建议您进行一些小改动:

      • 我会将execute 方法重命名为其他名称(因为它不执行任何操作)
      • 我会用Disposables.disposed() 初始化disposable 以避免空检查和
      • 我会将 RecognizeSubject 重命名为其他名称,因为目前它正在泄露有关其内部实现的信息。
      • 我会创建mSubject privatefinal
      • 我会摆脱匈牙利符号

      而且我不确定在处理位图时是否适合使用 flowable,您确定需要同时处理这么多位图并处理所有位图(并删除未处理的位图吗?)。

      【讨论】:

        【解决方案3】:

        好的,我已经阅读了您的答案并稍微更改了我的代码,这是一个说明我想要实现的图表:

        所以我的 Subject 类将在后台线程上处理接收到的数据,并通过 onNext() 方法将处理过的项目发送到他们的观察者。我做了一个简单的 Subject 接收 Integer 对象并将其转换为 String 对象,代码如下:

        public class MySubject {
        
        private String TAG = "MySubject";
        
        private PublishSubject<Integer> subject;
        private final Observable<String> observable;
        
        public MySubject() {
           Log.d(TAG, "---> MySubject() called");
           this.subject = PublishSubject.create();
           this.observable = subject
                   .doOnSubscribe(disposable -> init())
                   .doFinally(() -> relese()) //try do after terminate
                   .observeOn(Schedulers.newThread())
                   .map(this::myMap)
                   .observeOn(AndroidSchedulers.mainThread());
        }
        
        
        private void init(){
           Log.d(TAG, "---> init() called");
        }
        
        private void relese(){
           Log.d(TAG, "---> relese() called");
        }
        
        private String myMap(Integer integer){
           Log.d(TAG, "---> myMap() called int: " + integer);
           SystemClock.sleep(3000);
           return " :) " + String.valueOf(integer);
        }
        
        public void decode(Integer integer){
           subject.onNext(integer);
        }
        
        public Observable<String> getObservable(){
           return observable;
        }
        

        }

        这是 Activity 类中的一个用法:

        Disposable disposable = Disposables.disposed();
        MySubject subject = new MySubject();
        
        void onButton1() {
        
           if(disposable.isDisposed()){
               disposable = subject.getObservable()
                       .subscribe(s -> {
                           Log.d(TAG, "---> onNext() called " + s);
                       }, throwable -> {
                           Log.d(TAG, "---> onError() called " + throwable.getMessage());
                       }, () -> {
                           Log.d(TAG, "---> onCompleted() called ");
                       });
           }
        
           Random generator = new Random();
           int i = generator.nextInt(100) + 1;
           subject.decode(i);
        }
        

        每次调用 onButton1() 方法时,我都会将新的随机 int 发布到主题对象,然后在完成后通过 onNext() 方法接收处理过的数据。

        该解决方案是否正确且不会导致任何副作用或内存泄漏? 当然,我在 Activity 的 onStop() 方法中取消订阅主题。 或许有更好的解决方案来处理 Rxjava 中的此类问题?

        任何进一步的答案将不胜感激:)

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2019-04-07
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多