【问题标题】:Swift Combine: How to create a single publisher from a list of publishers?Swift Combine:如何从发布者列表中创建单个发布者?
【发布时间】:2019-11-08 22:43:26
【问题描述】:

使用 Apple 的新组合框架,我想从列表中的每个元素发出多个请求。然后我想从减少所有响应中得到一个单一的结果。基本上我想从发布者列表转到拥有响应列表的单个发布者。

我已尝试制作发布商列表,但我不知道如何将该列表缩减为单个发布商。而且我尝试制作一个包含列表的发布者,但我无法平面映射发布者列表。

请看“createIngredients”函数

func createIngredient(ingredient: Ingredient) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
    return apollo.performPub(mutation: CreateIngredientMutation(name: ingredient.name, optionalProduct: ingredient.productId, quantity: ingredient.quantity, unit: ingredient.unit))
            .eraseToAnyPublisher()
}

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
    // first attempt
    let results = ingredients
            .map(createIngredient)
    // results = [AnyPublisher<CreateIngredientMutation.Data, Error>]

    // second attempt
    return Publishers.Just(ingredients)
            .eraseToAnyPublisher()
            .flatMap { (list: [Ingredient]) -> Publisher<[CreateIngredientMutation.Data], Error> in
                return list.map(createIngredient) // [AnyPublisher<CreateIngredientMutation.Data, Error>]
            }
}

我不确定如何获取发布者数组并将其转换为包含数组的发布者。

“[AnyPublisher]”类型的结果值不符合闭包结果类型“Publisher”

【问题讨论】:

  • 如果我尝试像 apollo.fetch(query: AllProductsQuery())).eraseToAnyPublisher() 这样将 eraseToAnyPublisher() 与阿波罗一起使用,我会收到错误 Value of type 'Cancellable' has no member 'eraseToAnyPublisher' - 你是怎么做到的而没有看到错误?
  • @daidai 我使用了 apollo 的扩展来实现这一点。这个问题实际上是关于合并多个发布者。

标签: swift combine


【解决方案1】:

基本上,在您的特定情况下,您正在查看类似这样的内容:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
    Publishers.MergeMany(ingredients.map(createIngredient(ingredient:)))
        .collect()
        .eraseToAnyPublisher()
}

这会“收集”上游发布者生成的所有元素,并在它们全部完成后生成一个包含所有结果的数组并最终自行完成。

请记住,如果上游发布者之一失败 - 或产生多个结果 - 元素的数量可能与订阅者的数量不匹配,因此您可能需要额外的运营商来缓解这种情况,具体取决于您的情况。

更通用的答案,您可以使用EntwineTest framework 对其进行测试:

import XCTest
import Combine
import EntwineTest

final class MyTests: XCTestCase {
    
    func testCreateArrayFromArrayOfPublishers() {

        typealias SimplePublisher = Just<Int>

        // we'll create our 'list of publishers' here. Each publisher emits a single
        // Int and then completes successfully – using the `Just` publisher.
        let publishers: [SimplePublisher] = [
            SimplePublisher(1),
            SimplePublisher(2),
            SimplePublisher(3),
        ]

        // we'll turn our array of publishers into a single merged publisher
        let publisherOfPublishers = Publishers.MergeMany(publishers)

        // Then we `collect` all the individual publisher elements results into
        // a single array
        let finalPublisher = publisherOfPublishers.collect()

        // Let's test what we expect to happen, will happen.
        // We'll create a scheduler to run our test on
        let testScheduler = TestScheduler()

        // Then we'll start a test. Our test will subscribe to our publisher
        // at a virtual time of 200, and cancel the subscription at 900
        let testableSubscriber = testScheduler.start { finalPublisher }

        // we're expecting that, immediately upon subscription, our results will
        // arrive. This is because we're using `just` type publishers which
        // dispatch their contents as soon as they're subscribed to
        XCTAssertEqual(testableSubscriber.recordedOutput, [
            (200, .subscription),            // we're expecting to subscribe at 200
            (200, .input([1, 2, 3])),        // then receive an array of results immediately
            (200, .completion(.finished)),   // the `collect` operator finishes immediately after completion
        ])
    }
}

【讨论】:

  • 值得注意的是,这不会保留底层数组的顺序。最终数组的元素将按照每个发布者完成的顺序进行排序。
  • @rpowell 关于如何保留订单有什么想法吗?
  • 我找不到一个好的解决方案。我目前正在对我的所有值进行排序。
【解决方案2】:

我认为Publishers.MergeMany 在这里可能会有所帮助。在您的示例中,您可以像这样使用它:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
    let publishers = ingredients.map(createIngredient(ingredient:))
    return Publishers.MergeMany(publishers).eraseToAnyPublisher()
}

这将为您提供一个向您发送Output 的单个值的发布者。

但是,如果您特别希望在所有发布者完成后将Output 一次性放入数组中,则可以将collect()MergeMany 一起使用:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
    let publishers = ingredients.map(createIngredient(ingredient:))
    return Publishers.MergeMany(publishers).collect().eraseToAnyPublisher()
}

如果您愿意,上述任何一个示例都可以简化为一行,即:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
    Publishers.MergeMany(ingredients.map(createIngredient(ingredient:))).eraseToAnyPublisher()
}

您还可以在 Sequence 上定义自己的自定义 merge() 扩展方法,并使用它来稍微简化代码:

extension Sequence where Element: Publisher {
    func merge() -> Publishers.MergeMany<Element> {
        Publishers.MergeMany(self)
    }
}

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
    ingredients.map(createIngredient).merge().eraseToAnyPublisher()
}

【讨论】:

    【解决方案3】:

    为了补充 Tricky 的答案,这是一个保留数组中元素顺序的解决方案。 它通过整个链传递每个元素的索引,并按索引对收集的数组进行排序。

    由于排序,复杂度应该是 O(n log n)。

    import Combine
    
    extension Publishers {
    
        private struct EnumeratedElement<T> {
            let index: Int
            let element: T
    
            init(index: Int, element: T) {
                self.index = index
                self.element = element
            }
    
            init(_ enumeratedSequence: EnumeratedSequence<[T]>.Iterator.Element) {
                index = enumeratedSequence.offset
                element = enumeratedSequence.element
            }
        }
    
        static func mergeMappedRetainingOrder<InputType, OutputType>(
            _ inputArray: [InputType],
            mapTransform: (InputType) -> AnyPublisher<OutputType, Error>
        ) -> AnyPublisher<[OutputType], Error> {
    
            let enumeratedInputArray = inputArray.enumerated().map(EnumeratedElement.init)
    
            let enumeratedMapTransform: (EnumeratedElement<InputType>) -> AnyPublisher<EnumeratedElement<OutputType>, Error> = { enumeratedInput in
                mapTransform(enumeratedInput.element)
                    .map { EnumeratedElement(index: enumeratedInput.index, element: $0)}
                    .eraseToAnyPublisher()
            }
    
            let sortEnumeratedOutputArrayByIndex: ([EnumeratedElement<OutputType>]) -> [EnumeratedElement<OutputType>] = { enumeratedOutputArray in
                enumeratedOutputArray.sorted { $0.index < $1.index }
            }
    
            let transformToNonEnumeratedArray: ([EnumeratedElement<OutputType>]) -> [OutputType] = {
                $0.map { $0.element }
            }
    
            return Publishers.MergeMany(enumeratedInputArray.map(enumeratedMapTransform))
                .collect()
                .map(sortEnumeratedOutputArrayByIndex)
                .map(transformToNonEnumeratedArray)
                .eraseToAnyPublisher()
        }
    }
    

    解决方案的单元测试:

    import XCTest
    import Combine
    
    final class PublishersExtensionsTests: XCTestCase {
    
        // MARK: - Private properties
    
        private var cancellables = Set<AnyCancellable>()
    
        // MARK: - Tests
    
        func test_mergeMappedRetainingOrder() {
            let expectation = expectation(description: "mergeMappedRetainingOrder publisher")
    
            let numbers = (1...100).map { _ in Int.random(in: 1...3) }
    
            let mapTransform: (Int) -> AnyPublisher<Int, Error> = {
                let delayTimeInterval = RunLoop.SchedulerTimeType.Stride(Double($0))
                return Just($0)
                    .delay(for: delayTimeInterval, scheduler: RunLoop.main)
                    .setFailureType(to: Error.self)
                    .eraseToAnyPublisher()
            }
    
            let resultNumbersPublisher = Publishers.mergeMappedRetainingOrder(numbers, mapTransform: mapTransform)
    
            resultNumbersPublisher.sink(receiveCompletion: { _ in }, receiveValue: { resultNumbers in
                XCTAssertTrue(numbers == resultNumbers)
                expectation.fulfill()
             }).store(in: &cancellables)
    
            waitForExpectations(timeout: 5)
        }
    }
    

    【讨论】:

      【解决方案4】:

      你可以在一行中完成:

      .flatMap(Publishers.Sequence.init(sequence:))
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2020-08-05
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-11-06
        • 2021-05-25
        • 1970-01-01
        相关资源
        最近更新 更多