【问题标题】:RXJava Sequentially execute observableRXJava 顺序执行 observable
【发布时间】:2020-04-11 22:23:23
【问题描述】:

我有多个返回 Observable<String> 的函数。每个函数在文件系统上执行命令。我需要一个接一个地执行每个函数,并在 observable 中获取函数的输出。最后,我想要一个 Observable<String>,其中包含按函数调用顺序排列的所有函数的输出

每个函数都按预期工作,但我需要正确合并输出。

我尝试过这样的 Observable.concatArray(func1, func2, ... ):

    return Observable.concatArray(
        func1(),
        func2(),
        func3(), 
        func4()
    );

但这只是保留了 observable 事件的顺序。不是函数的顺序。我的意思是如果 func1 发出事件 A 和 A',而 func2 发出 B 和 B',我将有 A->A'->B->B'。但是 func2 会在 func1 之后立即启动。这导致我的问题是 func1 需要在 func2 启动之前完成。

第一个函数通过maven在文件系统上生成目录。所以,一个长期的任务。第二,在这个目录中写入一个文件。但是 concatArray 在第一个之后立即启动第二个。并且命令失败,因为此时目录不存在。

有没有办法避免这样丑陋的事情:

Subject<String> result = PublishSubject.create();
Observable<String> func1Obs = funct1(); 
Observable<String> func2Obs = funct2(); 

func1Obs.subscribe(output -> result.onNext(output));
func1Obs.onDoComplete(() -> {
    func2Obs.subscribe(output -> result.onNext(output);
}
return result;

【问题讨论】:

  • edit您的问题包括您的可观察对象的详细示例以及您希望从中读取的顺序。还要详细解释为什么 concat 在您的情况下不是正确的解决方案。
  • 记住——使用 observables 的全部意义在于你可以并行化任务。一旦你将它们并行化,它们现在可以以任何顺序完成。如果您想要/需要执行某个命令,您可以 a) 返回串行代码,或 b) 使用 .Then() 或 When() 等运算符:reactivex.io/documentation/observable.html
  • 我会尝试第二个选项
  • And/Then/When 仅适用于 RxJava 1。我使用 RxJava 3
  • @Scandinave 在调用subscribe() 之前,调用funct2() 方法是否已经触发了计算(创建文件)?

标签: java rx-java rx-java3


【解决方案1】:

作为 Suggest Progman,错误与 concatArray 无关,这是要使用的方法。问题是,在我的函数列表中,我使用的是这种代码:

public Observable<String> func1() {
    Subject<String> result = PublishSubject.create();
    String output = dosomething()
    result.onNext(output);
}

这里的问题是函数 doSomething() 在你创建 observable 时立即被调用。

如果您需要 onNext、onComplete 等,解决方案是使用 Observable.create() 之一:

public Observable<String> func1() {
    // See how we wrap our instruction inside create method
    return Observable.create( result -> {
        String output = dosomething()
        result.onNext(output);   
    });
}

Observable.defer(),如果您只需要等待订阅:

public Observable<String> func1() {
    // See how we wrap our instruction inside create method
    return Observable.defer( () -> dosomething());
}

之后你可以调用:

return Observable.concatArray(
    func1(),
    func2(),
    func3(), 
    func4()
);

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-04-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多