【问题标题】:Limiting concurrent access to a service class with RxSwift使用 RxSwift 限制对服务类的并发访问
【发布时间】:2017-10-03 07:30:07
【问题描述】:

给定这样的服务类:

class Service {
    let networkService = NetworkService()

    func handleJobA(input: String) -> Observable<ResultA> {
        return networkService
            .computeA(input)
            .map { $0.a }
    }
}

当我像这样从调用方使用它时:

let service = Service()

Observable
    .from(["Hello", "World"])
    .flatMap {
        service.handleJobA($0)
    }
    .subscribe()

然后这将同时向service 发送多个请求。我希望流等到每个请求完成。这是可以使用merge 运算符实现的。

Observable
    .from(["Hello", "World"])
    .flatMap {
        Observable.just(
            service.handleJobA($0)
        )
    }
    .merge(maxConcurrent: 1)
    .subscribe()

到目前为止,一切都很好 - 该服务不会同时执行多个 handleJobA 任务。

但是,并发是一个服务细节,调用者不应该关心它。事实上,服务在稍后阶段可能会决定允许不同的并发值。

其次,当我添加一个新方法handleJobB时,它不能与作业A同时处于活动状态,反之亦然。

所以我的问题是:

  1. 如何将 maxConcurrency 限制为可观察的 handleJobA 作为实现细节?
  2. 哪种 RxSwift 模式允许对任何服务方法进行限制?

【问题讨论】:

    标签: swift reactive-programming rx-swift


    【解决方案1】:

    您需要一个专用于该服务的串行调度程序。这是一个可以粘贴到游乐场的示例:

    /// playground
    
    import RxSwift
    
    class Service {
    
        func handleJobA(input: String) -> Observable<String> {
    
            return Observable.create { observer in
                print("start job a")
                sleep(3)
                observer.onNext(input)
                print("complete job a")
                observer.onCompleted()
                return Disposables.create()
            }.subscribeOn(scheduler)
        }
    
        func handleJobB(input: String) -> Observable<String> {
            return Observable.create { observer in
                print("start job b")
                sleep(3)
                observer.onNext(input)
                print("complete job b")
                observer.onCompleted()
                return Disposables.create()
                return Disposables.create()
            }.subscribeOn(scheduler)
        }
    
        let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
    }
    
    
    let service = Service()
    
    _ = Observable.from(["hello","world","swift"])
        .flatMap { service.handleJobA(input: $0) }
        .subscribe(onNext:{
            print("result " + $0)
        })
    
    _ = Observable.from(["hello","world","swift"])
        .flatMap { service.handleJobB(input: $0) }
        .subscribe(onNext:{
            print("result " + $0)
        })
    
    import PlaygroundSupport
    
    PlaygroundPage.current.needsIndefiniteExecution = true
    

    【讨论】:

    • 只有当内部 Observable 发出 observer.onCompleted() 时,SerialDispatchQueueScheduler 才会继续?我使用 Alamofire 作为网络层,不幸的是它在主线程上响应并导致所有麻烦。
    • 顺便说一句,如果它不能处理多个传入请求,那么它是一个非常糟糕的服务器。当您拥有多个用户时会发生什么?
    • Daniel,它是一个资源非常有限的嵌入式设备。多个资源会导致卡顿,同时只有一个设备可以连接。
    • A\ccepted 由于 Alamofire-Synchronous 推荐。
    【解决方案2】:

    也许你想要concat 运算符,我在下面写了一些测试代码,看看你是否想要:

     func sleepAndPrint(label:String) -> Observable<String> {
    
    
            return Observable.create { obser -> Disposable in
                DispatchQueue.global().async {
                    sleep(3)
                    print("\(label) come")
                    obser.onNext(label)
                    obser.onCompleted()
                }
    
                return Disposables.create()
            }
        }
    
    
    Observable.from(["hello","world","swift"])
        // we need observable of observable sequences so just use map
        // Observable<Observable<String>> in this case
        .map{
            sleepAndPrint(label: $0)
        }
        // Concatenates all inner observable sequences, as long as the previous observable sequence terminated successfully.
        .concat()
    
        .subscribe(onNext:{
            print("subscribe: " + $0)
        })
        .addDisposableTo(disposeBag)
    
        prints : 
          hello come
          subscribe: hello
          world come
          subscribe: world
          swift come
          subscribe: swift
    

    【讨论】:

    • 谢谢。虽然该方案适用于限制并发,但仍然需要调用者处理并发。我将更新我的问题以反映我在寻找什么。
    • @SebastianRoth 在你的问题中似乎必须处理并发,你想在课堂上做吗Service
    • 正确,是的。问题是该服务调用了一个 Web 服务(通过RxAlamofire),该服务将数据请求和数据响应拆分为 2 个 observables。这就是我寻找这个解决方案的原因。
    • @SebastianRoth 让Service 成为单身人士,然后让handleJobA 成为addHandleJobAInStack 之类的东西,然后一个接一个地使用contact,但是面对@ 时,这确实是一项艰巨的工作987654329@Manually cancel....
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-07-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多