【问题标题】:RxSwift smoothen Observable valuesRxSwift 平滑 Observable 值
【发布时间】:2018-07-25 18:19:48
【问题描述】:
// Transform a stream of [Int] where the chunks are received in bursts into a continuous stream of metric values
//
//   Example:
//
//      Original:      <-----[Int32][Int32][Int32]------------------[Int32][Int32][Int32]----------------[Int32][Int32][Int32]----------->
//      Transformed:   <-----Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32->
//

注意事项:

  • Observable 接收和每 1 秒 250 个点的数组
  • 希望对此进行平滑处理,以便在那 1 秒内分配点
  • 发送它们以供 UI 用于绘图

RectiveSwift 实现:

扩展 SignalProducerProtocol where Value == [Double], Error == BioDemoError {

func startAndFlattenBurstIntervalChunks(periodInMs: UInt, minTimeDeltaBetweenBurstsInMs: UInt64 = 500) -> (Signal<Double, NoError>, Disposable) {

    let (individualValueSignal, individualValueObserver) = Signal<Double, NoError>.pipe()
    let compositeDisposable = CompositeDisposable()
    let valueBuffer = MutableProperty<[Double]>([])
    var timeOfLastChunk = Date() //first one doesn't matter

    func addChunkToBuffer(chunk: [Double]) {
        let currentTime = Date()
        valueBuffer.modify { buffer in

            // Only flush current buffer if this is the first chunk of a burst
            let shouldFlushCurrentBuffer = currentTime.millisecondsSinceUnixEpoch - timeOfLastChunk.millisecondsSinceUnixEpoch > minTimeDeltaBetweenBurstsInMs

            if(shouldFlushCurrentBuffer) {
                buffer.forEach(individualValueObserver.send)
                buffer = chunk
            } else {
                buffer = buffer + chunk
            }

        }
        timeOfLastChunk = currentTime
    }

    func sendFirstBufferValue() {
        valueBuffer.modify { buffer in
            if let firstValue = buffer.first {
                individualValueObserver.send(value: firstValue)
                buffer = buffer.tail()
            }
        }
    }

    let periodTime = DispatchTimeInterval.milliseconds(Int(periodInMs))

    compositeDisposable.add(
        timer(interval: periodTime, on: QueueScheduler())
            .map { _ in () }
            .startWithValues(sendFirstBufferValue)
    )

    compositeDisposable.add(
        self
            .flatMapError { _ in SignalProducer<[Double], NoError>.never }
            .startWithValues(addChunkToBuffer)
    )

    return (individualValueSignal, compositeDisposable)

}

}

【问题讨论】:

    标签: ios swift rx-swift


    【解决方案1】:

    我会将 .flatMap() 数组添加到单个项目 (.flatMap { Observable.from(iterable: $0)}) 和 .zip() 使用可观察的计时器(如 Observable.interval(1, scheduler: MainScheduler.instance))同时确保背压不会妨碍它们。见http://rxmarbles.com/#zip

    【讨论】:

      【解决方案2】:

      您可以使用 Relay 而不是 MutableProperty 从 ReactiveSwift 编写完全相同的实现。我也可能会调查window()

      【讨论】:

        【解决方案3】:

        您基本上需要将“不规则”数据存储到某种缓冲区,然后以固定的时间间隔读取存储的元素。这是我想出的:

        struct Smoothener {
            let disposeBag = DisposeBag()
            static let elementsPerInterval: Double = 10
            static let intervalSize: Double = 1
            let smoothedResultObservable: Observable<Int>
        
            // Generates bursts once per interval, test data
            // Can be any pattern you want
            let sourceObservable = Observable<Int>.create { observer in
                var number = 0
                let timer = Timer.scheduledTimer(withTimeInterval: Smoothener.intervalSize, repeats: true) { timer in
                    for _ in 0...Int(Smoothener.elementsPerInterval) {
                        observer.onNext(number)
                        number += 1 // For easier testing incrementing number
                    }
                }
        
                return Disposables.create {
                    timer.fire()
                }
            }
        
            // Time counter
            let timerObservable = Observable<Date>.create { (observer) -> Disposable in
                let timer = Timer.scheduledTimer(withTimeInterval: Smoothener.intervalSize / Smoothener.elementsPerInterval, repeats: true) { timer in
                    observer.onNext(timer.fireDate )
                }
        
                return Disposables.create {
                    timer.fire()
                }
            }
        
            init() {
                // Fills the buffer from the generated data
                var currentBuffer: [Int] = []
               _ =  sourceObservable
                    .buffer(timeSpan: Smoothener.intervalSize, count: Int(Smoothener.elementsPerInterval), scheduler: MainScheduler.instance)
                    .subscribe(onNext: { (buffer) in
                        currentBuffer = buffer
                    })
        
                // transform buffers to integers when the timer fires
                smoothedResultObservable = timerObservable
                    // only proceed if the buffer is filled
                    .filter({ _ in currentBuffer.count == Int(Smoothener.elementsPerInterval) })
                    .enumerated()
                    .map({ (index, observable) -> Int in
                        return currentBuffer[index % Int(Smoothener.elementsPerInterval)]
                    })
            }
        }
        

        并打印平滑的结果:

            _ = Smoothener().smoothedResultObservable
                .subscribe(onNext: { (number) in
                    print(number) // numbers get output at a fixed interval
                })
        

        sourceObservable 可能是您传递给Smoothener 的东西,而不是在结构内部生成值。

        【讨论】:

        • 感谢您的回复。由于我使用的是扩展,因此我安装了将其用作我的 sourceObservable 的解决方案。我在.buffer 之后的订阅中遇到cannot assign value of type 'Event&lt;[[Int32]]&gt;' to type 'Int[32]' 。我的sourceObservable 来自扩展extension Observable where E == [Int32]
        • 我只需要 .buffer.... 以确保在经过一定时间或点数已满时刷新 currentBuffer
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多