【问题标题】:Combine framework serialize async operations组合框架序列化异步操作
【发布时间】:2020-05-01 18:41:32
【问题描述】:

如何让构成组合框架的异步管道同步(串行)排列?

假设我有 50 个 URL,我想从其中下载相应的资源,假设我想一次下载一个。我知道如何使用 Operation / OperationQueue 来做到这一点,例如使用在下载完成之前不会声明自己完成的操作子类。我如何使用 Combine 做同样的事情?

目前我所想到的就是保留剩余 URL 的全局列表并弹出一个,为一次下载设置一个管道,进行下载,然后在管道的 sink 中重复.这似乎不太像组合。

我确实尝试过制作 URL 数组并将其映射到发布者数组。我知道我可以“产生”一个发布者,并使用flatMap 使其在管道上发布。但是我仍然同时进行所有下载。没有任何 Combine 方法可以以受控的方式遍历数组 - 或者有吗?

(我也想过用 Future 做点什么,但我变得非常困惑。我不习惯这种思维方式。)

【问题讨论】:

    标签: ios swift combine


    【解决方案1】:

    在所有其他响应式框架中,这真的很容易;您只需使用concat 一步连接和展平结果,然后您可以reduce 将结果放入最终数组。 Apple 使这变得困难,因为Publisher.Concatenate 没有接受发布者数组的重载。 Publisher.Merge 也有类似的奇怪之处。我感觉这与它们返回嵌套的泛型发布者而不是仅仅返回像 rx Observable 这样的单个泛型类型有关。我想你可以在循环中调用Concatenate,然后将连接的结果减少到一个数组中,但我真的希望他们在下一个版本中解决这个问题。肯定需要连接超过 2 个发布者并合并超过 4 个发布者(这两个运算符的重载甚至不一致,这很奇怪)。

    编辑:

    我回过头来发现你确实可以连接任意的发布者数组,它们会按顺序发出。我不知道为什么没有像ConcatenateMany 这样的函数来为您执行此操作,但看起来只要您愿意使用类型擦除的发布者,自己编写一个并不难。这个例子展示了 merge 按时间顺序发出,而 concat 按组合顺序发出:

    import PlaygroundSupport
    import SwiftUI
    import Combine
    
    let p = Just<Int>(1).append(2).append(3).delay(for: .seconds(0.25), scheduler: RunLoop.main).eraseToAnyPublisher()
    let q = Just<Int>(4).append(5).append(6).eraseToAnyPublisher()
    let r = Just<Int>(7).append(8).append(9).delay(for: .seconds(0.5), scheduler: RunLoop.main).eraseToAnyPublisher()
    let concatenated: AnyPublisher<Int, Never> = [q,r].reduce(p) { total, next in
      total.append(next).eraseToAnyPublisher()
    }
    
    var subscriptions = Set<AnyCancellable>()
    
    concatenated
      .sink(receiveValue: { v in
        print("concatenated: \(v)")
      }).store(in: &subscriptions)
    
    Publishers
      .MergeMany([p,q,r])
      .sink(receiveValue: { v in
        print("merge: \(v)")
      }).store(in: &subscriptions)
    

    【讨论】:

    • 是的,你可能猜到我故意选择了 50 这样的大数字。
    • 有一个 MergeMany。我不明白为什么没有 ConcatenateMany。 Rx swift 有 Observable.concat 而 Reactive Swift 有 flatMap(.concat) 所以这很奇怪;也许我错过了一些东西。我会继续看developer.apple.com/documentation/combine/publishers/mergemany
    • concat 会序列化(在其他反应式框架中)吗?
    • 是的。对于序列序列,您只有一种展平方法,即,将一个内部序列的元素一个接一个地放置,就像在 swift 中的 Sequence.flatMap 一样。当你有一个异步序列时,你必须在展平时考虑时间维度。因此,您可以按时间顺序从所有内部序列发出元素(合并),也可以按序列的顺序从每个内部序列发出元素(连续)。见弹珠图:rxmarbles.com/#concat vs rxmarbles.com/#merge
    • 请注意,.append 是一个创建Publisher.Concatenate 的运算符。
    【解决方案2】:

    这是描述可能方法的一页游乐场代码。主要思想是将异步API调用转换为Future发布者链,从而制作串行管道。

    输入:从 1 到 10 的 int 范围,在后台队列中异步转换为字符串

    直接调用异步 API 的演示:

    let group = DispatchGroup()
    inputValues.map {
        group.enter()
        asyncCall(input: $0) { (output, _) in
            print(">> \(output), in \(Thread.current)")
            group.leave()
        }
    }
    group.wait()
    

    输出:

    >> 1, in <NSThread: 0x7fe76264fff0>{number = 4, name = (null)}
    >> 3, in <NSThread: 0x7fe762446b90>{number = 3, name = (null)}
    >> 5, in <NSThread: 0x7fe7624461f0>{number = 5, name = (null)}
    >> 6, in <NSThread: 0x7fe762461ce0>{number = 6, name = (null)}
    >> 10, in <NSThread: 0x7fe76246a7b0>{number = 7, name = (null)}
    >> 4, in <NSThread: 0x7fe764c37d30>{number = 8, name = (null)}
    >> 7, in <NSThread: 0x7fe764c37cb0>{number = 9, name = (null)}
    >> 8, in <NSThread: 0x7fe76246b540>{number = 10, name = (null)}
    >> 9, in <NSThread: 0x7fe7625164b0>{number = 11, name = (null)}
    >> 2, in <NSThread: 0x7fe764c37f50>{number = 12, name = (null)}
    

    组合管道演示:

    输出:

    >> got 1
    >> got 2
    >> got 3
    >> got 4
    >> got 5
    >> got 6
    >> got 7
    >> got 8
    >> got 9
    >> got 10
    >>>> finished with true
    

    代码:

    import Cocoa
    import Combine
    import PlaygroundSupport
    
    // Assuming there is some Asynchronous API with
    // (eg. process Int input value during some time and generates String result)
    func asyncCall(input: Int, completion: @escaping (String, Error?) -> Void) {
        DispatchQueue.global(qos: .background).async {
                sleep(.random(in: 1...5)) // wait for random Async API output
                completion("\(input)", nil)
            }
    }
    
    // There are some input values to be processed serially
    let inputValues = Array(1...10)
    
    // Prepare one pipeline item based on Future, which trasform Async -> Sync
    func makeFuture(input: Int) -> AnyPublisher<Bool, Error> {
        Future<String, Error> { promise in
            asyncCall(input: input) { (value, error) in
                if let error = error {
                    promise(.failure(error))
                } else {
                    promise(.success(value))
                }
            }
        }
        .receive(on: DispatchQueue.main)
        .map {
            print(">> got \($0)") // << sideeffect of pipeline item
            return true
        }
        .eraseToAnyPublisher()
    }
    
    // Create pipeline trasnforming input values into chain of Future publishers
    var subscribers = Set<AnyCancellable>()
    let pipeline =
        inputValues
        .reduce(nil as AnyPublisher<Bool, Error>?) { (chain, value) in
            if let chain = chain {
                return chain.flatMap { _ in
                    makeFuture(input: value)
                }.eraseToAnyPublisher()
            } else {
                return makeFuture(input: value)
            }
        }
    
    // Execute pipeline
    pipeline?
        .sink(receiveCompletion: { _ in
            // << do something on completion if needed
        }) { output in
            print(">>>> finished with \(output)")
        }
        .store(in: &subscribers)
    
    PlaygroundPage.current.needsIndefiniteExecution = true
    

    【讨论】:

      【解决方案3】:

      我只是对此进行了简单的测试,但在第一次通过时,似乎每个请求都在等待前一个请求完成后再开始。

      我发布此解决方案以寻求反馈。如果这不是一个好的解决方案,请批评。

      extension Collection where Element: Publisher {
      
          func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
              // If the collection is empty, we can't just create an arbititary publisher
              // so we return nil to indicate that we had nothing to serialize.
              if isEmpty { return nil }
      
              // We know at this point that it's safe to grab the first publisher.
              let first = self.first!
      
              // If there was only a single publisher then we can just return it.
              if count == 1 { return first.eraseToAnyPublisher() }
      
              // We're going to build up the output starting with the first publisher.
              var output = first.eraseToAnyPublisher()
      
              // We iterate over the rest of the publishers (skipping over the first.)
              for publisher in self.dropFirst() {
                  // We build up the output by appending the next publisher.
                  output = output.append(publisher).eraseToAnyPublisher()
              }
      
              return output
          }
      }
      
      

      此解决方案的更简洁版本(由@matt 提供):

      extension Collection where Element: Publisher {
          func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
              guard let start = self.first else { return nil }
              return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
                  $0.append($1).eraseToAnyPublisher()
              }
          }
      }
      

      【讨论】:

      • 太好了,谢谢。 append 正是我想要的。 — 您的代码可以大大收紧;特别是,在count == 1 的情况下不需要过早返回,因为在这种情况下dropFirst 将是空的,我们不会循环。并且不需要维护output 变量,因为我们可以使用reduce 代替for...in。请参阅我的答案以获得更紧密的渲染。
      【解决方案4】:

      您可以创建自定义订阅者,接收返回的订阅者.Demand.max(1)。在这种情况下,订阅者只有在收到一个值时才会请求下一个值。该示例适用于 Int.publisher,但地图中的一些随机延迟模仿了网络流量 :-)

      import PlaygroundSupport
      import SwiftUI
      import Combine
      
      class MySubscriber: Subscriber {
        typealias Input = String
        typealias Failure = Never
      
        func receive(subscription: Subscription) {
          print("Received subscription", Thread.current.isMainThread)
          subscription.request(.max(1))
        }
      
        func receive(_ input: Input) -> Subscribers.Demand {
          print("Received input: \(input)", Thread.current.isMainThread)
          return .max(1)
        }
      
        func receive(completion: Subscribers.Completion<Never>) {
          DispatchQueue.main.async {
              print("Received completion: \(completion)", Thread.current.isMainThread)
              PlaygroundPage.current.finishExecution()
          }
        }
      }
      
      (110...120)
          .publisher.receive(on: DispatchQueue.global())
          .map {
              print(Thread.current.isMainThread, Thread.current)
              usleep(UInt32.random(in: 10000 ... 1000000))
              return String(format: "%02x", $0)
          }
          .subscribe(on: DispatchQueue.main)
          .subscribe(MySubscriber())
      
      print("Hello")
      
      PlaygroundPage.current.needsIndefiniteExecution = true
      

      游乐场打印 ...

      Hello
      Received subscription true
      false <NSThread: 0x600000064780>{number = 5, name = (null)}
      Received input: 6e false
      false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
      Received input: 6f false
      false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
      Received input: 70 false
      false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
      Received input: 71 false
      false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
      Received input: 72 false
      false <NSThread: 0x600000064780>{number = 5, name = (null)}
      Received input: 73 false
      false <NSThread: 0x600000064780>{number = 5, name = (null)}
      Received input: 74 false
      false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
      Received input: 75 false
      false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
      Received input: 76 false
      false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
      Received input: 77 false
      false <NSThread: 0x600000053400>{number = 3, name = (null)}
      Received input: 78 false
      Received completion: finished true
      

      更新 最后我找到了.flatMap(maxPublishers: ),这迫使我用有点不同的方法来更新这个有趣的话题。请注意,我正在使用全局队列进行调度,不仅仅是一些随机延迟,只是为了确保接收序列化流不是“随机”或“幸运”行为:-)

      import PlaygroundSupport
      import Combine
      import Foundation
      
      PlaygroundPage.current.needsIndefiniteExecution = true
      
      let A = (1 ... 9)
          .publisher
          .flatMap(maxPublishers: .max(1)) { value in
              [value].publisher
                  .flatMap { value in
                      Just(value)
                          .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: DispatchQueue.global())
              }
      }
      .sink { value in
          print(value, "A")
      }
      
      let B = (1 ... 9)
          .publisher
          .flatMap { value in
              [value].publisher
                  .flatMap { value in
                      Just(value)
                          .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: RunLoop.main)
              }
      }
      .sink { value in
          print("     ",value, "B")
      }
      

      打印

      1 A
            4 B
            5 B
            7 B
            1 B
            2 B
            8 B
            6 B
      2 A
            3 B
            9 B
      3 A
      4 A
      5 A
      6 A
      7 A
      8 A
      9 A
      

      根据这里写的

      .serialize()?

      由 Clay Ellis 定义的接受的答案可以替换为

      .publisher.flatMap(maxPublishers: .max(1)){$0}

      而“非序列化”版本必须使用

      .publisher.flatMap{$0}

      “现实世界的例子”

      import PlaygroundSupport
      import Foundation
      import Combine
      
      let path = "postman-echo.com/get"
      let urls: [URL] = "... which proves the downloads are happening serially .-)".map(String.init).compactMap { (parameter) in
          var components = URLComponents()
          components.scheme = "https"
          components.path = path
          components.queryItems = [URLQueryItem(name: parameter, value: nil)]
          return components.url
      }
      //["https://postman-echo.com/get?]
      struct Postman: Decodable {
          var args: [String: String]
      }
      
      
      let collection = urls.compactMap { value in
              URLSession.shared.dataTaskPublisher(for: value)
              .tryMap { data, response -> Data in
                  return data
              }
              .decode(type: Postman.self, decoder: JSONDecoder())
              .catch {_ in
                  Just(Postman(args: [:]))
          }
      }
      
      extension Collection where Element: Publisher {
          func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
              guard let start = self.first else { return nil }
              return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
                  return $0.append($1).eraseToAnyPublisher()
              }
          }
      }
      
      var streamA = ""
      let A = collection
          .publisher.flatMap{$0}
      
          .sink(receiveCompletion: { (c) in
              print(streamA, "     ", c, "    .publisher.flatMap{$0}")
          }, receiveValue: { (postman) in
              print(postman.args.keys.joined(), terminator: "", to: &streamA)
          })
      
      
      var streamC = ""
      let C = collection
          .serialize()?
      
          .sink(receiveCompletion: { (c) in
              print(streamC, "     ", c, "    .serialize()?")
          }, receiveValue: { (postman) in
              print(postman.args.keys.joined(), terminator: "", to: &streamC)
          })
      
      var streamD = ""
      let D = collection
          .publisher.flatMap(maxPublishers: .max(1)){$0}
      
          .sink(receiveCompletion: { (c) in
              print(streamD, "     ", c, "    .publisher.flatMap(maxPublishers: .max(1)){$0}")
          }, receiveValue: { (postman) in
              print(postman.args.keys.joined(), terminator: "", to: &streamD)
          })
      
      PlaygroundPage.current.needsIndefiniteExecution = true
      

      打印

      .w.h i.c hporves ht edownloadsa erh appeninsg eriall y.-)       finished     .publisher.flatMap{$0}
      ... which proves the downloads are happening serially .-)       finished     .publisher.flatMap(maxPublishers: .max(1)){$0}
      ... which proves the downloads are happening serially .-)       finished     .serialize()?
      

      在我看来,在其他情况下也非常有用。尝试在下一个 sn-p 中使用 maxPublishers 的默认值并比较结果:-)

      import Combine
      
      let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
      let subject = PassthroughSubject<String, Never>()
      
      let handle = subject
          .zip(sequencePublisher.print())
          //.publish
          .flatMap(maxPublishers: .max(1), { (pair)  in
              Just(pair)
          })
          .print()
          .sink { letters, digits in
              print(letters, digits)
          }
      
      "Hello World!".map(String.init).forEach { (s) in
          subject.send(s)
      }
      subject.send(completion: .finished)
      

      【讨论】:

      • @matt sink 没有任何不同,只是在接收返回 Subsribers.Demand.unlimited ... 可能正在使用正确的工具,如串行队列和 Data.init?(contentsOf url: URL ) 是您场景中的最佳选择。如果您需要对两个 Int 求和,您是否将其作为 [lhs: Int, rhs: Int].reduce .... ???我将在 MySerialDownloaderSubscriber 的 receive(_input:) 中使用 Data.init?(contentsOf url: URL)。
      • @matt 请查看更新后的答案。结合是令人兴奋的,但(至少对我来说)很难理解......
      • 是的,我明白了!使用maxPublishers 参数,我们可以添加背压。这与我在我的问题中所说的相符:“我知道我可以“生产”一个发布者并使用 flatMap 使其在管道上发布。但是我仍然同时进行所有下载。好吧,使用 maxPublishers 参数,它们不是同时发生的。
      • @matt 是的,sink 使用 Subscribers.Demand.unlimited 调用发布者自己的订阅者,在我们的用例 .max(1) 中,flatMap 与设置发布者自己的订阅者具有不同的值具有相同的效果。我只是添加了另一个具有不同场景的示例,它非常有用。
      【解决方案5】:

      来自原始问题:

      我确实尝试过制作 URL 数组并将其映射到发布者数组。我知道我可以“产生”一个发布者并使用flatMap 使其在管道上发布。但是我仍然同时进行所有下载。没有任何组合方式可以以受控方式遍历数组——或者有吗?


      这是一个代表真正问题的玩具示例:

      let collection = (1 ... 10).map {
          Just($0).delay(
              for: .seconds(Double.random(in:1...5)),
              scheduler: DispatchQueue.main)
              .eraseToAnyPublisher()
      }
      collection.publisher
          .flatMap() {$0}
          .sink {print($0)}.store(in:&self.storage)
      

      这会在随机时间以随机顺序发出从 1 到 10 的整数。目标是对collection 做一些事情,使其按顺序发出从 1 到 10 的整数。


      现在我们只改变一件事:在行中

      .flatMap {$0}
      

      我们添加maxPublishers参数:

      let collection = (1 ... 10).map {
          Just($0).delay(
              for: .seconds(Double.random(in:1...5)),
              scheduler: DispatchQueue.main)
              .eraseToAnyPublisher()
      }
      collection.publisher
          .flatMap(maxPublishers:.max(1)) {$0}
          .sink {print($0)}.store(in:&self.storage)
      

      Presto,我们现在按顺序发出从 1 到 10 的整数,它们之间有随机间隔。


      让我们将其应用于原始问题。为了演示,我需要一个相当慢的 Internet 连接和一个相当大的资源来下载。首先,我会用普通的.flatMap

      let eph = URLSessionConfiguration.ephemeral
      let session = URLSession(configuration: eph)
      let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
      let collection = [url, url, url]
          .map {URL(string:$0)!}
          .map {session.dataTaskPublisher(for: $0)
              .eraseToAnyPublisher()
      }
      collection.publisher.setFailureType(to: URLError.self)
          .handleEvents(receiveOutput: {_ in print("start")})
          .flatMap() {$0}
          .map {$0.data}
          .sink(receiveCompletion: {comp in
              switch comp {
              case .failure(let err): print("error", err)
              case .finished: print("finished")
              }
          }, receiveValue: {_ in print("done")})
          .store(in:&self.storage)
      

      结果是

      start
      start
      start
      done
      done
      done
      finished
      

      这表明我们正在同时进行三个下载。好的,现在换

          .flatMap() {$0}
      

          .flatMap(maxPublishers:.max(1) {$0}
      

      现在的结果是:

      start
      done
      start
      done
      start
      done
      finished
      

      所以我们现在是串行下载,这是原来要解决的问题。


      追加

      根据 TIMTOWTDI 的原则,我们可以改为使用 append 链接发布者来序列化它们:

      let collection = (1 ... 10).map {
          Just($0).delay(
              for: .seconds(Double.random(in:1...5)),
              scheduler: DispatchQueue.main)
              .eraseToAnyPublisher()
      }
      let pub = collection.dropFirst().reduce(collection.first!) {
          return $0.append($1).eraseToAnyPublisher()
      }
      

      结果是一个发布者序列化原始集合中的延迟发布者。让我们通过订阅它来证明它:

      pub.sink {print($0)}.store(in:&self.storage)
      

      果然,整数现在按顺序到达(之间有随机间隔)。


      我们可以按照 Clay Ellis 的建议,封装从发布者集合中创建的 pub,并在 Collection 上进行扩展:

      extension Collection where Element: Publisher {
          func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
              guard let start = self.first else { return nil }
              return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
                  return $0.append($1).eraseToAnyPublisher()
              }
          }
      }
      

      【讨论】:

        【解决方案6】:

        flatMap(maxPublishers:transform:).max(1) 一起使用,例如

        func imagesPublisher(for urls: [URL]) -> AnyPublisher<UIImage, URLError> {
            Publishers.Sequence(sequence: urls.map { self.imagePublisher(for: $0) })
                .flatMap(maxPublishers: .max(1)) { $0 }
                .eraseToAnyPublisher()
        }
        

        在哪里

        func imagePublisher(for url: URL) -> AnyPublisher<UIImage, URLError> {
            URLSession.shared.dataTaskPublisher(for: url)
                .compactMap { UIImage(data: $0.data) }
                .receive(on: RunLoop.main)
                .eraseToAnyPublisher()
        }
        

        var imageRequests: AnyCancellable?
        
        func fetchImages() {
            imageRequests = imagesPublisher(for: urls).sink { completion in
                switch completion {
                case .finished:
                    print("done")
                case .failure(let error):
                    print("failed", error)
                }
            } receiveValue: { image in
                // do whatever you want with the images as they come in
            }
        }
        

        结果是:

        但我们应该认识到,您按顺序执行这些操作会对性能造成很大影响,就像这样。例如,如果我一次将它提高到 6 个,它的速度是原来的两倍多:

        就个人而言,我建议仅在绝对必须的情况下按顺序下载(在下载一系列图像/文件时,几乎可以肯定不是这种情况)。是的,并发执行请求可能会导致它们没有按特定顺序完成,但我们只是使用与顺序无关的结构(例如字典而不是简单数组),但性能提升非常显着,通常值得这样做。

        但是,如果您希望它们按顺序下载,maxPublishers 参数可以实现。

        【讨论】:

        • 是的,这就是我的答案已经说的:stackoverflow.com/a/59889993/341994 以及我将赏金授予stackoverflow.com/a/59889174/341994的答案
        • 现在也可以看看我的书apeth.com/UnderstandingCombine/operators/…
        • 顺便说一下顺序,我已经很好地利用了您的顺序异步操作来完成不同的任务,感谢您编写它
        • @matt - 大声笑。我承认我没有看到您找到了maxPublishers 选项。如果我注意到是你,我就不会喋喋不休地谈论“不要做连续剧”(因为我知道你完全理解连续剧与并发的利弊)。我实际上只看到“我想一次下载一个文件”,我最近偶然发现maxPublishers 选项用于我正在做的其他事情(即提供modern solution to this question),我想我会分享组合解决方案我想出了。我不是故意的。
        • 是的,我之前说的就是stackoverflow.com/a/48104095/1271826提到的解决方案;我发现这很有帮助。
        【解决方案7】:

        URL 的动态数组呢,比如数据总线?

              var array: [AnyPublisher<Data, URLError>] = []
        
              array.append(Task())
        
              array.publisher
                 .flatMap { $0 }
                 .sink {
        
                 }
                 // it will be finished
              array.append(Task())
              array.append(Task())
              array.append(Task())
        

        【讨论】:

          【解决方案8】:

          另一种方法,如果您想收集所有下载结果,以便知道哪个失败,哪个失败,则编写一个如下所示的自定义发布者:

          extension Publishers {
              struct Serialize<Upstream: Publisher>: Publisher {
                  typealias Output = [Result<Upstream.Output, Upstream.Failure>]
                  typealias Failure = Never
          
                  let upstreams: [Upstream]
          
                  init<C: Collection>(_ upstreams: C) where C.Element == Upstream {
                      self.upstreams = Array(upstreams)
                  }
          
                  init(_ upstreams: Upstream...) {
                      self.upstreams = upstreams
                  }
          
                  func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
                      guard let first = upstreams.first else { return Empty().subscribe(subscriber) }
                      first
                          .map { Result<Upstream.Output, Upstream.Failure>.success($0) }
                          .catch { Just(Result<Upstream.Output, Upstream.Failure>.failure($0)) }
                          .map { [$0] }
                          .append(Serialize(upstreams.dropFirst()))
                          .collect()
                          .map { $0.flatMap { $0 } }
                          .subscribe(subscriber)
                  }
              }
          }
          
          extension Collection where Element: Publisher {  
              func serializedPublishers() -> Publishers.Serialize<Element> {
                  .init(self)
              }
          }
          

          发布者获取第一个下载任务,将其输出/失败转换为 Result 实例,并将其添加到列表其余部分的“递归”调用之前。

          用法:Publishers.Serialize(listOfDownloadTasks),或listOfDownloadTasks.serializedPublishers()

          这个实现的一个小不便是Result 实例需要被包装到一个数组中,只是为了在管道中的三个步骤之后被展平。也许有人可以提出更好的替代方案。

          【讨论】:

            猜你喜欢
            • 2018-06-04
            • 1970-01-01
            • 2013-07-29
            • 2019-08-25
            • 1970-01-01
            • 1970-01-01
            • 2017-05-02
            • 1970-01-01
            相关资源
            最近更新 更多