【发布时间】:2018-07-25 18:19:48
【问题描述】:
// Transform a stream of [Int] where the chunks are received in bursts into a continuous stream of metric values
//
// Example:
//
// Original: <-----[Int32][Int32][Int32]------------------[Int32][Int32][Int32]----------------[Int32][Int32][Int32]----------->
// Transformed: <-----Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32-Int32->
//
注意事项:
- Observable 接收和每 1 秒 250 个点的数组
- 希望对此进行平滑处理,以便在那 1 秒内分配点
- 发送它们以供 UI 用于绘图
RectiveSwift 实现:
扩展 SignalProducerProtocol where Value == [Double], Error == BioDemoError {
func startAndFlattenBurstIntervalChunks(periodInMs: UInt, minTimeDeltaBetweenBurstsInMs: UInt64 = 500) -> (Signal<Double, NoError>, Disposable) {
let (individualValueSignal, individualValueObserver) = Signal<Double, NoError>.pipe()
let compositeDisposable = CompositeDisposable()
let valueBuffer = MutableProperty<[Double]>([])
var timeOfLastChunk = Date() //first one doesn't matter
func addChunkToBuffer(chunk: [Double]) {
let currentTime = Date()
valueBuffer.modify { buffer in
// Only flush current buffer if this is the first chunk of a burst
let shouldFlushCurrentBuffer = currentTime.millisecondsSinceUnixEpoch - timeOfLastChunk.millisecondsSinceUnixEpoch > minTimeDeltaBetweenBurstsInMs
if(shouldFlushCurrentBuffer) {
buffer.forEach(individualValueObserver.send)
buffer = chunk
} else {
buffer = buffer + chunk
}
}
timeOfLastChunk = currentTime
}
func sendFirstBufferValue() {
valueBuffer.modify { buffer in
if let firstValue = buffer.first {
individualValueObserver.send(value: firstValue)
buffer = buffer.tail()
}
}
}
let periodTime = DispatchTimeInterval.milliseconds(Int(periodInMs))
compositeDisposable.add(
timer(interval: periodTime, on: QueueScheduler())
.map { _ in () }
.startWithValues(sendFirstBufferValue)
)
compositeDisposable.add(
self
.flatMapError { _ in SignalProducer<[Double], NoError>.never }
.startWithValues(addChunkToBuffer)
)
return (individualValueSignal, compositeDisposable)
}
}
【问题讨论】: