【发布时间】:2018-02-12 19:32:20
【问题描述】:
我有以下代码结构
服务
public Flowable entryFlow()
{
return Flowable.fromIterable(this::getEntries)
}
消费者
void start()
{
disposable = service
.entryFlow()
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.computation())
.subscribe(
entry -> ...,
this::onError,
this::subscriptionFinished);
}
void stop()
{
disposable.dispose();
}
private void onError(Throwable e)
{
subscriptionFinished();
}
private void subscriptionFinished()
{
//
}
当调用 stop 方法时,我需要一种方法来阻止 flowable 获取和发送数据。
通过执行以下操作,我注意到并不总是调用 doOnCancel lambda。
void start()
{
disposable = service
.entryFlow()
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.computation())
.doOnCancel(this::snapshotFinished)
.subscribe(
entry -> ...,
this::onError,
this::subscriptionFinished);
}
void stop()
{
disposable.dispose();
}
替代方案是
volatile stopped;
void start()
{
disposable = service
.entryFlow()
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.computation())
.takeUntil(x -> stopped)
.subscribe(
entry -> ...,
this::onError,
this::subscriptionFinished);
}
void stop()
{
stopped = true;
}
启动和停止的推荐实现是什么,以使 flowable 停止发射并调用 onComplete 或类似方法(doOnCancel 操作?)?
稍后编辑:
让我的用例更短
调用disposable.dispose 来阻止flowable 从iterable 获取数据并发送到源就足够了吗?我只有 1 个订阅者,无论原因如何,当 flowable 结束时都需要调用 onComplete/onError/other-callback。
其他回调是指 doOnCancel/doFinally 等之一。
谢谢
【问题讨论】:
-
如果当时流程已经完成,则不会调用
doOnCancel的唯一原因。为什么您只需要对取消做出反应?您可以考虑doFinally对终止和取消做出反应。 -
doFinally 听起来很有趣,但从 javadoc 中我了解到它将在 onComplete / orError 之外被调用。我真的不需要知道是什么导致流程结束(完成、错误、取消),但我希望结束操作只执行一次。
-
在后面的编辑部分对我的问题添加了更简洁的描述