【问题标题】:RxJava Live Reactive Queue (with on off switch)RxJava Live Reactive Queue(带开关)
【发布时间】:2017-07-24 15:30:43
【问题描述】:

在 kotlin 中开发一个 android 应用程序。

我需要设置一个系统,以便能够从实时队列中进行工作(并在流中观察工作的结果)。

但我还必须能够根据networkIsAvailable (Observable<Boolean>) 等几个外部因素(也以流的形式出现)来切换“队列处理”。

我不能使用Observable.fromIterable(),因为这会立即创建可迭代对象,并且此队列会调整并且项目可能会被删除。

我需要某种循环来完成该项目,检查以确保我们应该继续前进,然后弹出队列的第一个项目并执行此操作。

我不确定如何在订阅中执行类似这样的循环?

队列也可能会变空,并且当切换开关重新打开时,事情应该会重新开始。

也许我应该将该决定(关于是否处理队列中的下一个项目)推送到Subject<Boolean>?,然后订阅该主题以再次启动该过程?

例子:

打开----处理队列顶部,处理队列顶部(之前被轮询出队列)---关闭-不再处理

重新打开 -- 处理队列顶部,队列为空 -- 停止

添加项目到队列——进程——停止队列为空

开启处理 -- 将项目添加到队列 -- 在重新开启之前不会处理

开启 -- 处理顶部项目

【问题讨论】:

  • 您能提供更多信息吗?我不明白你的意图,因为英语不是我的强项。
  • 对不起。你想知道什么?非技术要求: 有一个要处理的队列 能够监听正在处理的队列项的结果并对其进行处理 能够异步打开和关闭处理
  • 很遗憾,我帮不了你。 :)
  • 我只是添加了一些更具体的例子。
  • @EpicPandaForce 你会在下面添加valve 作为答案,以便我将其标记为正确并给你因果报应吗?

标签: java kotlin rx-android reactive


【解决方案1】:

您可以为此使用来自RxJava2Extensionsvalve() 运算符。

public PublishProcessor<Boolean> flowControl = PublishProcessor.create();

public void start() {
    Flowable./*...*/
            .compose(FlowableTransformers.valve(flowControl))
            .subscribe(/*...*/);

在这种情况下,flowControl.onNext(true) 将启动流,flowControl.onNext(false) 将停止并对其进行排队。

(任何其他 Rx2 类型必须转换为 Flowable 才能使用。)

【讨论】:

    【解决方案2】:

    我最近一直在处理这个问题(在管理 NIO 选择键时倾听压力)并集中在一个使用通用运算符的相对简单的解决方案上:

    Queue<T> queue;
    
    // This controls whether the queue is being polled or not.
    // It's probably a good idea to make sure this emits the last value upon
    // subscription using a BehaviorProcessor or a ReplayFlowable etc.
    Flowable<Boolean> condition;
    
    // This is a "dumb" generator which continuously polls the queue and emits items
    Flowable<T> generator = Flowable.generate(emitter -> {
        T item = queue.poll();
        if (item != null) {
            emitter.onNext(item);
        }
    });
    
    // We wire our generator and condition together to achieve the expected result.
    Flowable<T> drainWhileCondition = condition.distinctUntilChanged()
        .concatMap(c -> c
            ? generator.takeUntil(condition.filter(Boolean.FALSE::equals))
            : Flowable.empty());
    

    你也可以为此制作一个转换器方法:

    static <T> FlowableTransformer<T, T> valve(Flowable<Boolean> condition) {
        return upstream -> condition.distinctUntilChanged()
                .concatMap(c -> c
                        ? upstream.takeUntil(condition.filter(Boolean.FALSE::equals))
                        : Flowable.empty());
    }
    

    然后像这样使用它:

    Flowable<T> generator;
    Flowable<Boolean> condition;
    
    Flowable<T> valvedGenerator = generator.compose(valve(condition));
    

    【讨论】:

      猜你喜欢
      • 2020-06-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多