【问题标题】:Strategy for multiple subscribers and flow of data using RxJava 2使用 RxJava 2 的多订阅者和数据流策略
【发布时间】:2017-11-06 19:34:11
【问题描述】:

我无法决定如何使用RxJava2 正确执行此任务。

问题如下。我正在使用AuidoRecord 录制音频。

目前我已经实现了这样的自定义Flowable

private class StreamAudioRecordRunnable extends Flowable<short[]> implements Runnable {
    private int mShortBufferSize;
    private List<Subscriber<? super short[]>> mSubscribers = new ArrayList<>();
    private short[] mAudioShortBuffer;

    private void removeAllNullableSubscribers() {
        mSubscribers.removeAll(Collections.singleton(null));
    }

    private void notifyAllSubscribers(short[] audioBuffer) {
        removeAllNullableSubscribers();
        for (Subscriber<? super short[]> subscriber : mSubscribers) {
            subscriber.onNext(audioBuffer);
        }
    }

    @Override
    protected void subscribeActual(Subscriber<? super short[]> newSubscriber) {
        mSubscribers.add(newSubscriber);
    }

    private void notifyAllSubscribersAboutError(Throwable error) {
        for (Subscriber<? super short[]> subscriber : mSubscribers) {
            subscriber.onError(error);
        }
    }

    @Override
    public void run() {
            // Init stuff
            while (mIsRecording.get()) {
                int ret;
                ret = mAudioRecord.read(mAudioShortBuffer, 0, mShortBufferSize);                              
                notifyAllSubscribers(mAudioShortBuffer);
             }
        mAudioRecord.release();
    }
}

如您所见,我正在手动将订阅者添加到列表中。然后,当我获得新缓冲区时,所有订阅者都会收到通知。

我猜这不是最高效的方法。

我需要什么

  1. 至于这个在服务中运行的 flowable。即使没有订阅者,它也应该一直运行到服务处于活动状态。
  2. 订阅者不是固定的,他们可以订阅然后取消订阅,但 Flowable/Observable 应该仍在运行。
  3. 由于 Flowable 发出的数据是流,因此不应通知订阅者已发出的项目,他们应该只获取当前的流数据。开火即忘。
  4. 即使所有订阅者都消失了,Flowable 也应该运行。

请提出正确的实施策略。 如果有任何帮助,我将不胜感激。

【问题讨论】:

  • 您使用 Runnable 有什么原因吗?
  • @JohnWowUs,不,这只是我的小错误,我会尽快改掉它,只要 RxJava 允许指定执行线程
  • @JohnWowUs 你对实现本身有什么看法?
  • 好吧,我不会为一件事使用子类。我有一个ConnectableFlowable 作为会员,然后订阅者可以通过hide 作为Flowable 获得。
  • @JohnWowUs,根据文档,ConnectableFlowable 在连接所有订阅者之前不会开始发送项目。这不符合我的要求,我有一个独立的音频数据流,应该独立于订阅者发出。

标签: java android performance rx-java rx-java2


【解决方案1】:

类似

public class StreamAudioRecordRunnable  {
    private int mShortBufferSize;
    private short[] mAudioShortBuffer;
    private ConnectedFlowable<short[]> audioFlowable();

    public StreamAudioRecordRunnable() {
        audioFlowable = Flowable.create(new ObservableOnSubscribe<short[]>() {
            @Override
            public void subscribe(FlowableEmitter<short[]> emitter) throws Exception {
                try {
                    while (mIsRecording.get()) {
                        int ret;
                        ret = mAudioRecord.read(mAudioShortBuffer, 0, mShortBufferSize);                              
                        emitter.onNext(mAudioShortBuffer);
                    }
                    emitter.onComplete();
                    mAudioRecord.release();
                } catch (Exception e) {
                    emitter.onError(e);
                    mAudioRecord.release();
                }
            }
        }).subscribeOn(Schedulers.io()).publish();  
    }


    public Flowable<short[]> getFlowable() {
            return audioFlowable.hide();
    }

    @Override
    public void start() {
        audioObservable.connect();      
    }
}

会是我的偏好。

【讨论】:

  • 感谢您的回答,您能解释一下使用这种方法的专业人士吗?
  • 尽可能使用组合而不是继承。您不必管理订阅者。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-02-08
  • 1970-01-01
  • 2017-07-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多