【发布时间】:2016-10-08 05:15:52
【问题描述】:
我对 RxSwift 比较陌生,但我期待在我的项目中更多地使用它,我很想听听关于我刚写的操作符的一些反馈。
我缺少的功能是一个去抖动缓冲区:一个行为与 debounce 运算符完全相同的缓冲区,但它不只发出最新的值,它应该发出自上次发出以来收集的所有值。
在 RxJava 中,这很容易通过使用带有另一个 observable 的缓冲区作为“关闭选择器”来实现:
// From: https://github.com/ReactiveX/RxJava/wiki/Backpressure
//
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
在 RxSwift 中虽然不存在这个版本的缓冲区操作符(我认为这个问题是相关的:https://github.com/ReactiveX/RxSwift/issues/590),所以我尝试自己解决这个问题。
我的第一种方法只是构建去抖动缓冲区:
extension ObservableType {
func debouncedBuffer(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
var valueBuffer: [E] = []
let observable = self.do(onNext: { (value) in
valueBuffer.append(value)
}, onError: { (error) in
valueBuffer = []
}, onCompleted: {
valueBuffer = []
}, onSubscribe: {
valueBuffer = []
}, onDispose: {
valueBuffer = []
}).debounce(dueTime, scheduler: scheduler).flatMap { (value) -> Observable<[E]> in
let emitValues = valueBuffer
valueBuffer = []
return Observable<[E]>.just(emitValues)
}
return observable
}
}
我的第二种方法是构建任何关闭条件(如 RxJava 版本)的缓冲区:
extension ObservableType {
func buffer<R>(_ selector: Observable<R>) -> Observable<[E]> {
var valueBuffer: [E] = []
return Observable.create { observer in
let selectorSubscription = selector.subscribe(onNext: { (value) in
let emitValues = valueBuffer
valueBuffer = []
observer.on(.next(emitValues))
}, onError: { (error) in
valueBuffer = []
observer.on(.error(error))
}, onCompleted: {
valueBuffer = []
observer.on(.completed)
}, onDisposed: {
valueBuffer = []
})
let subscription = self.subscribe(onNext: { (value) in
valueBuffer.append(value)
}, onError: { (error) in
observer.on(.error(error))
selectorSubscription.dispose()
}, onCompleted: {
observer.on(.completed)
selectorSubscription.dispose()
}, onDisposed: {
observer.on(.completed)
selectorSubscription.dispose()
})
return subscription
}
}
}
我已经测试了这两个运算符,它们似乎可以工作,还测试了处理 onError、onDispose 和 onCompleted 事件的不同组合。
但是,如果这至少是一个没有泄漏的可接受的解决方案,并且如果我违反了任何 RX 合同,我仍然希望听到更有经验的人的一些反馈。
我还用一些测试代码创建了一个 pastebin:http://pastebin.com/1iAbUPf8
【问题讨论】:
-
我建议您在 RxSwiftExt 上提出 PR,并查看 RxSwift Slack channel。
-
谢谢,slack 频道是个好主意,我会考虑创建 PR。