【问题标题】:How to zip more than 4 publishers如何压缩超过 4 个发布者
【发布时间】:2020-02-21 20:27:06
【问题描述】:

我正在使用 Swift Combine 来处理我的 API 请求。现在我面临一种情况,我想要将超过 4 个并行请求压缩在一起。在我使用 Zip4() 运算符将 4 个请求压缩在一起之前。我可以想象您分多个步骤进行压缩,但我不知道如何为其编写 receiveValue。

这是我当前代码的简化,包含 4 个并行请求:

    Publishers.Zip4(request1, request2, request3, request4)
        .sink(receiveCompletion: { completion in
            // completion code if all 4 requests completed
        }, receiveValue: { request1Response, request2Response, request3Response, request4Response in
            // do something with request1Response
            // do something with request2Response
            // do something with request3Response
            // do something with request4Response
        }
    )
        .store(in: &state.subscriptions)

【问题讨论】:

    标签: swift combine


    【解决方案1】:

    阻止您压缩任意数量的发布者的是一个非常不幸的事实,即 Apple 选择将 zip 运算符的输出设为 元组。元组非常不灵活,并且它们的能力有限。你不能有一个元组,比如说,十个元素;你甚至不能将元素附加到元组,因为这会导致你得到不同的类型。因此,我们需要一个新的运算符,它与zip 完成相同的工作,但会发出一些更强大、更灵活的结果,例如数组。

    我们可以制作一个!幸运的是,zip 运算符本身有一个 transform 参数,可以让我们指定我们想要的输出类型。

    好的,所以,为了说明,我将 10 个出版商压缩在一起。首先,我将创建一个包含 10 个发布者的数组;他们将仅仅是 Just 出版商,但这足以说明这一点,并且为了证明我没有作弊,我将任意延迟他们每个人:

    let justs = (1...10).map {
        Just($0)
            .delay(for: .seconds(Int.random(in:1...3)), scheduler: DispatchQueue.main)
            .eraseToAnyPublisher() }
    

    好的,现在我有一个发布者数组,我会循环压缩它们:

    let result = justs.dropFirst().reduce(into: AnyPublisher(justs[0].map{[$0]})) { 
        res, just in
        res = res.zip(just) {
            i1, i2 -> [Int] in
            return i1 + [i2]
        }.eraseToAnyPublisher()
    }
    

    注意zip 运算符后面的闭包!这确保我的输出将是 Array<Int> 而不是元组。与元组不同,我可以创建任意大小的数组,只需每次通过循环添加元素。

    好的,result 现在是一个 Zip 发布者,它将十个发布者压缩在一起。为了证明这一点,我只需将订阅者附加到它并打印输出:

    result.sink {print($0)}.store(in: &self.storage)
    

    我们运行代码。有一个令人心碎的停顿——没错,因为这些 Just 发布者中的每一个都有不同的随机延迟,而 zip 的规则是他们所有都需要在我们得到任何输出之前发布。他们迟早都会这样做,并且输出会出现在控制台中:

    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    

    完全正确的答案!我已经证明我确实将十个出版商压缩在一起,以产生由他们每个人的单一贡献组成的输出。

    将任意数量的数据任务发布者(或您正在使用的任何东西)压缩在一起也不例外。

    (对于一个相关问题,我学习如何序列化任意数量的数据任务发布者,请参阅Combine framework serialize async operations。)

    【讨论】:

    • 感谢您提供如此详细的解释。这似乎是一个很好的解决方案,特别是如果您不知道在编译时要压缩的发布者数量。对于我的特殊情况,我只有 6 个要压缩的发布者,所以我选择了 Predrag Samardzic 的解决方案。
    • 当然,我只是想展示一下一般情况下如何做到这一点。
    • 非常好,唯一的麻烦是所有发布者必须是同一类型,如果是 zip 运算符,这可能是真正的限制。
    • Publishers.Zip3(Just(0), Just("alfa"), Just(true)) 是一个有效的声明。
    • @user3441734 嗯,我明白了。我该如何模仿呢?
    【解决方案2】:

    基于Matt's answer:

    extension Publishers {
        struct ZipMany<Element, F: Error>: Publisher {
            typealias Output = [Element]
            typealias Failure = F
    
            private let upstreams: [AnyPublisher<Element, F>]
    
            init(_ upstreams: [AnyPublisher<Element, F>]) {
                self.upstreams = upstreams
            }
    
            func receive<S: Subscriber>(subscriber: S) where Self.Failure == S.Failure, Self.Output == S.Input {
                let initial = Just<[Element]>([])
                    .setFailureType(to: F.self)
                    .eraseToAnyPublisher()
    
                let zipped = upstreams.reduce(into: initial) { result, upstream in
                    result = result.zip(upstream) { elements, element in
                        elements + [element]
                    }
                    .eraseToAnyPublisher()
                }
    
                zipped.subscribe(subscriber)
            }
        }
    }
    

    单元测试可以使用以下内容作为输入:

    let upstreams: [AnyPublisher<String, Never>] = [
        Just("first")
            .receive(on: DispatchQueue.main)
            .eraseToAnyPublisher(),
        Just("second").eraseToAnyPublisher()
    ]
    

    .receive(on:) 将该事件的发射放在主队列的末尾,以便它在 "second" 之后发射。

    【讨论】:

      【解决方案3】:

      认为你可以这样做:

      let zipped1 = Publishers.Zip4(request1, request2, request3, request4)    
      let zipped2 = Publishers.Zip4(request5, request6, request7, request8)
      
      Publishers.Zip(zipped1, zipped2)
          .sink(receiveCompletion: { completion in
              // completion code if all 8 requests completed
          }, receiveValue: { response1, response2 in
              // do something with response1.0
              // do something with response1.1
              // do something with response1.2, response1.3, response2.0, response2.1, response2.2, response2.3
          }
      )
          .store(in: &state.subscriptions)
      

      【讨论】:

      • 谢谢,这对我来说效果很好,因为我有一定​​数量的请求要发出。
      【解决方案4】:

      使用 transform

      为我工作
      let pub1: Just<Int> = Just(1)
      let pub2: Just<String> = Just("string")
      let pub3: Just<Double> = Just(1)
      let pub4: Just<Float> = Just(1)
      
      let pub = pub1
          .zip(pub2)
          .zip(pub3, { return ($0.0, $0.1, $1) })
          .zip(pub4, { return ($0.0, $0.1, $0.2, $1) })
      
      var cancel: Set<AnyCancellable> = .init()
      
      pub.sink {
          print($0.0) // is Int
          print($0.1) // is String
          print($0.2) // is Double
          print($0.3) // is Float
      }.store(in: &cancel)
      

      或使用 Publishers.Zip4

      的示例
      let pub1: Just<Int> = Just(1)
      let pub2: Just<String> = Just("string")
      let pub3: Just<Double> = Just(1)
      let pub4: Just<Float> = Just(1)
      
      let pub5: Just<Int> = Just(2)
      let pub6: Just<String> = Just("string2")
      let pub7: Just<Double> = Just(2)
      let pub8: Just<Float> = Just(2)
      
      let zip1 = Publishers.Zip4(pub1, pub2, pub3, pub4)
      let zip2 = Publishers.Zip4(pub5, pub6, pub7, pub8)
      
      let pub = zip1.zip(zip2, { return ($0.0 ,$0.1, $0.2, $0.3, $1.0, $1.1, $1.2, $1.3) })
      
      var cancel: Set<AnyCancellable> = .init()
      
      pub.sink {
          print($0.0) // is Int
          print($0.1) // is String
          print($0.2) // is Double
          print($0.3) // is Float
          print($0.4) // is Int
          print($0.5) // is String
          print($0.6) // is Double
          print($0.7) // is Float
      }.store(in: &cancel)
      

      【讨论】:

        【解决方案5】:

        (1) Predrag 与 (2) Matt 的回答

        (1) 我记不太清了,如何使用结果(闭包中的命名不是一些“好记”的符号

        (2) Matt的解决方案仅限于相同的Output type,zip没有这个限制

        我建议一个不同的变体

        let handler =
            publisher1
                .zip(publisher2)
                .zip(publisher3)
                .zip(publisher4)
                .zip(publisher5)
                .zip(publisher6)
        
                .sink(receiveCompletion: { (c) in
                    print(c)
                }) { (value) in
                    print(
                        value.1,            // 1
                        value.0.1,          // 2
                        value.0.0.1,        // 3
                        value.0.0.0.1,      // 4
                        value.0.0.0.0.1,    // 5
                        value.0.0.0.0.0     // 6
                    )
        }
        

        这仍远未达到最佳状态,但(至少对我而言)更易于使用且压缩发布者的数量几乎不受限制

        来自快速语法

        GRAMMAR OF A TUPLE TYPE
        
        tuple-type → ( ) | ( tuple-type-element , tuple-type-element-list )
        tuple-type-element-list → tuple-type-element | tuple-type-element , tuple-type-element-list
        tuple-type-element → element-name type-annotation | type
        element-name → identifier
        

        看来,这可以通过编译器来解决,也许我们必须要求社区包含一些复合类型的扁平化,以便为我们所用。

        【讨论】:

          【解决方案6】:

          如果您想继续使用Zip 语义,您可以为此编写一个自定义发布者。基本上,新的Zip5 将是Zip4 和第五个发布者之间的Zip

          extension Publishers {
              struct Zip5<A: Publisher, B: Publisher, C: Publisher, D: Publisher, E: Publisher>: Publisher
              where A.Failure == B.Failure, A.Failure == C.Failure, A.Failure == D.Failure, A.Failure == E.Failure {
                  typealias Output = (A.Output, B.Output, C.Output, D.Output, E.Output)
                  typealias Failure = A.Failure
                  
                  private let a: A
                  private let b: B
                  private let c: C
                  private let d: D
                  private let e: E
                  
                  init(_ a: A, _ b: B, _ c: C, _ d: D, _ e: E) {
                      self.a = a
                      self.b = b
                      self.c = c
                      self.d = d
                      self.e = e
                  }
                  
                  func receive<S>(subscriber: S) where S : Subscriber, Output == S.Input, Failure == S.Failure {
                      Zip(Zip4(a, b, c, d), e)
                          .map { ($0.0, $0.1, $0.2, $0.3, $1) }
                          .receive(subscriber: subscriber)
                  }
              }
          }
          
          extension Publisher {
              func zip<O1: Publisher, O2: Publisher, O3: Publisher, O4: Publisher>(_ o1: O1, _ o2: O2, _ o3: O3, _ o4: O4) -> Publishers.Zip5<Self, O1, O2, O3, O4> {
                  .init(self, o1, o2, o3, o4)
              }
          }
          

          以类似的方式,Zip6Zip7 可以写成:

          • Zip6 =&gt; Zip(Zip4(a, b, c, d, e), Zip(e, f))
          • Zip7 =&gt; Zip(Zip4(a, b, c, d, e), Zip3(e, f, g))

          ,以此类推。

          缺点是这需要编写大量代码,如果您最终需要这种zip 操作,也许这是重新审视您的应用程序设计的好机会,也许您不需要毕竟不需要这么多的拉链。

          为支持上述内容,请查看zip6() 声明的样子:

          func zip<O1: Publisher, O2: Publisher, O3: Publisher, O4: Publisher, O5: Publisher>(_ o1: O1, _ o2: O2, _ o3: O3, _ o4: O4, _ o5: O5) -> Publishers.Zip6<Self, O1, O2, O3, O4, O5> {
              .init(self, o1, o2, o3, o4, o5)
          }
          

          拥有如此多的泛型参数以及对所有泛型参数的约束,使其更难使用和理解。

          【讨论】:

            【解决方案7】:

            我以为我需要这样的东西,但根据您的用例,您也可以使用 collect() 等待所有上游发布者完成,然后再做其他事情。

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2013-05-31
              相关资源
              最近更新 更多