【问题标题】:Dynamically terminate Observable based on another observable | RxSwift基于另一个 observable 动态终止 Observable | RxSwift
【发布时间】:2021-06-16 18:23:54
【问题描述】:

我有一组 Observable,比如 [Observable <WriteTaskResult>]

我想执行所有写任务保持它们的顺序,如果其中任何一个失败,那么我想执行Observable<ResetTaskResult>

以下函数将返回 BatchTasksResult 类型的 observable 用于跟踪任务进度。

示例代码:

enum BatchTasksResult{
    case elapsedTime(Double)
    case failedFatal
    case rolledback
    case success
}

func writeBlocks(tasks: [WriteTask]) -> Observable<BatchTasksResult>{
    return Observable.create {(observable) -> Disposable in
       let allTasks: [Observable<WriteTaskResult>] = self.writeSomewhere(tasks)
       Observable.concat(allTasks)
         .subscribe { writeTaskResult in
               observable.onNext(.elapsedTime(writeTaskResult.totalTime))
            } 
            onError: { (err) in
               // Perform Observable<ResetTaskResult>
               // if ResetTask was successful then observable.onNext(.rolledback)
               // if ResetTask failed then observable.onNext(.failedFatal)
            }
            onCompleted: {
               observable.onNext(.success)
            }
            .disposed(by: disposeBag)
       return Disposables.create()
    }
}

如何使用来自 allTask​​s 的 observable 的 onError 的 Observable 触发回滚逻辑?

简单的解决方案似乎嵌套了 observable,但我猜这不是一个好习惯?我尝试了 FlatMap,但它并不能真正解决 “如果任何单个任务失败,则回滚并重置”还有其他解决方案吗?

【问题讨论】:

    标签: ios swift xcode reactive-programming rx-swift


    【解决方案1】:

    无需使用create 函数添加额外的间接级别。每个 Observable 运算符都已经创建了一个新对象。

    当你使用Observable.create时,不要丢弃在外部处理袋中并返回Disposables.create()。只需退回您刚刚创建的一次性用品。

    这是做你想做的事情的适当方法:

    func writeBlocks(tasks: [WriteTask], resetTask: Single<ResetTaskResult>) -> Observable<BatchTasksResult> {
        // create the array of write tasks and concat them. You seem to have that down.
        let result = Observable.concat(tasks.map(writeSomewhere(task:)).map { $0.asObservable() })
            .share() // the share is needed because you are using the value twice below.
        return Observable.merge(
            // push out the elapsed time for each task.
            result.map { BatchTasksResult.elapsedTime($0.totalTime) },
            // when the last one is done, push out the success event.
            result.takeLast(1).map { _ in BatchTasksResult.success }
        )
        .catch { _ in
            resetTask // the resetTask will get subscribed to if needed.
                .map { _ in BatchTasksResult.rolledback } // if successful emit a rollback
                .catch { _ in Single.just(BatchTasksResult.failedFatal) } // otherwise emit the failure.
                .asObservable()
        }
    }
    
    func writeSomewhere(task: WriteTask) -> Single<WriteTaskResult> {
        // create a Single that performs the write and emits a result.
    }
    

    【讨论】:

    • 这对我有帮助。谢谢。但我有一个疑问,.catch/.catchError 仍然允许执行进一步的序列,对吗?如果要避免他们应该使用 .do(onError:) 对吗?
    • 一旦concat 发出错误,它将停止。不会再发生任务写入。即使后来发现错误,也会发生这种情况。
    • 等一下,让我在不同的评论中解释一下。
    • 那我不明白你的疑问。 resetTask (大概)是单曲。所以一旦它发出(或错误)它就会停止。那时没有“进一步的序列”可以执行......
    • 没有错误。每次写入完成时,合并的第一行都会发出一个值。一旦所有写入完成,合并的第二行就会发出一个值。任何订阅它的东西都会为每次写入获得一个事件,并在所有写入完成时获得一个事件。我以为这就是你想要的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多