【问题标题】:Implementing a debounced buffer with RxSwift, is this correct?使用 RxSwift 实现去抖动缓冲区,这是正确的吗?
【发布时间】:2016-10-08 05:15:52
【问题描述】:

我对 RxSwift 比较陌生,但我期待在我的项目中更多地使用它,我很想听听关于我刚写的操作符的一些反馈。

我缺少的功能是一个去抖动缓冲区:一个行为与 debounce 运算符完全相同的缓冲区,但它不只发出最新的值,它应该发出自上次发出以来收集的所有值。

RxJava 中,这很容易通过使用带有另一个 observable 的缓冲区作为“关闭选择器”来实现:

// From: https://github.com/ReactiveX/RxJava/wiki/Backpressure
//
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);

RxSwift 中虽然不存在这个版本的缓冲区操作符(我认为这个问题是相关的:https://github.com/ReactiveX/RxSwift/issues/590),所以我尝试自己解决这个问题。


我的第一种方法只是构建去抖动缓冲区:

extension ObservableType {
    func debouncedBuffer(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
        var valueBuffer: [E] = []

        let observable = self.do(onNext: { (value) in
            valueBuffer.append(value)
        }, onError: { (error) in
            valueBuffer = []
        }, onCompleted: {
            valueBuffer = []
        }, onSubscribe: {
            valueBuffer = []
        }, onDispose: {
            valueBuffer = []
        }).debounce(dueTime, scheduler: scheduler).flatMap { (value) -> Observable<[E]> in
            let emitValues = valueBuffer
            valueBuffer = []
            return Observable<[E]>.just(emitValues)
        }

        return observable
    }
}

我的第二种方法是构建任何关闭条件(如 RxJava 版本)的缓冲区:

extension ObservableType {
    func buffer<R>(_ selector: Observable<R>) -> Observable<[E]> {
        var valueBuffer: [E] = []

        return Observable.create { observer in
            let selectorSubscription = selector.subscribe(onNext: { (value) in
                let emitValues = valueBuffer
                valueBuffer = []
                observer.on(.next(emitValues))
            }, onError: { (error) in
                valueBuffer = []
                observer.on(.error(error))
            }, onCompleted: {
                valueBuffer = []
                observer.on(.completed)
            }, onDisposed: {
                valueBuffer = []
            })

            let subscription = self.subscribe(onNext: { (value) in
                valueBuffer.append(value)
            }, onError: { (error) in
                observer.on(.error(error))
                selectorSubscription.dispose()
            }, onCompleted: {
                observer.on(.completed)
                selectorSubscription.dispose()
            }, onDisposed: {
                observer.on(.completed)
                selectorSubscription.dispose()
            })
            return subscription
        }
    }
}

我已经测试了这两个运算符,它们似乎可以工作,还测试了处理 onError、onDispose 和 onCompleted 事件的不同组合。

但是,如果这至少是一个没有泄漏的可接受的解决方案,并且如果我违反了任何 RX 合同,我仍然希望听到更有经验的人的一些反馈。

我还用一些测试代码创建了一个 pastebin:http://pastebin.com/1iAbUPf8

【问题讨论】:

标签: swift rx-swift reactivex


【解决方案1】:

这是我的buffer(bufferOpenings, bufferClosingSelector)。可能需要进一步审查。

extension ObservableType {

    func buffer<R>(bufferOpenings: Observable<R>, bufferClosingSelector: (R)->Observable<R>) -> Observable<[E]> {
        var valueBuffer: [E]? = nil

        let operatorObservable = Observable<[E]>.create({ observer in
            let subject = PublishSubject<[E]>()

            let closingsSub = bufferOpenings
                .doOnNext({ _ in
                    valueBuffer = []
                })
                .flatMap({ opening in
                    return bufferClosingSelector(opening)
                })
                .subscribeNext({ _ in
                    if let vb = valueBuffer {
                        subject.onNext(vb)
                    }
                    valueBuffer = nil
                }
            )

            let bufferSub = self.subscribe(
                onNext: { value in
                    valueBuffer?.append(value)
                },
                onError: { error in
                    subject.onError(error)
                },
                onCompleted: {
                    subject.onCompleted()
                },
                onDisposed: {
                }
            )

            let subjectSub = subject.subscribe(
                onNext: { (value) in
                    observer.onNext(value)
                },
                onError: { (error) in
                    observer.onError(error)
                },
                onCompleted: {
                    observer.onCompleted()
                },
                onDisposed: {
                }
            )

            let combinedDisposable = CompositeDisposable()

            combinedDisposable.addDisposable(closingsSub)
            combinedDisposable.addDisposable(bufferSub)
            combinedDisposable.addDisposable(subjectSub)

            return combinedDisposable

        })

        return operatorObservable
    }

}

【讨论】:

    猜你喜欢
    • 2019-06-19
    • 2016-05-28
    • 1970-01-01
    • 2021-09-08
    • 2021-07-09
    • 2019-03-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多