【发布时间】:2015-04-30 12:02:14
【问题描述】:
我正在尝试在 RxJava(实际上是 RxScala)中实现 ObserveLatestOn 运算符。
当我们有一个快速的生产者和一个慢的订阅者时,这个操作符很有用,但订阅者不关心在消费项目时丢失的任何项目。
大理石图:
--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|
= 字符代表订阅者正在执行的长期工作,> 字符代表刚刚完成的工作。作为典型的使用示例,想象一些需要显示的数据的生产者,以及作为订阅者的数据的屏幕渲染器。渲染时间比较长,但是我们不需要在屏幕上渲染每一步,最后一个就完美了。
在上面的弹珠图中,生产者发出信号 1. 订阅者开始处理它,需要很长时间。同时,生产者发出 2 和 3,订阅者完成工作。它看到生产者发出的最后一个项目是 3,所以它开始处理它。速度很快,同时没有新项目产生,所以订阅者可以休息。然后,5 到了,故事以同样的方式继续。
我花了几个小时试图实现这个看似简单的运算符,但我仍然不满意。运算符的本质表明它应该是异步的,它应该在与接收它们不同的调度程序上发出其项目。但同时,我当然不希望一个线程被一个worker占用而没有工作要做。
这是我目前想出的:
def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
@volatile var maybeNextItem: Option[Notification[T]] = None
@volatile var isWorkScheduled = false
val itemsQueueLock = new Object()
Observable(subscriber ⇒ {
def signalToSubscriber(materializedItem: Notification[T]): Unit = {
materializedItem match {
case OnNext(item) ⇒ subscriber onNext item
case OnError(error) ⇒ subscriber onError error
case OnCompleted ⇒ subscriber.onCompleted()
}
}
def queueItem(item: Notification[T]): Unit = {
val worker = scheduler.createWorker
val shouldScheduleWork = itemsQueueLock synchronized {
val result = !isWorkScheduled
maybeNextItem = Some(item)
isWorkScheduled = true
result
}
if (shouldScheduleWork) {
worker.scheduleRec {
val maybeNextItemToSignal = itemsQueueLock synchronized {
val result = maybeNextItem
if (result.isEmpty) {
worker.unsubscribe()
isWorkScheduled = false
}
maybeNextItem = None
result
}
maybeNextItemToSignal foreach signalToSubscriber
}
}
}
o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
next ⇒ queueItem(OnNext(next)),
error ⇒ queueItem(OnError(error)),
() ⇒ queueItem(OnCompleted)
)
})
}
它似乎有效,但我不相信没有竞争条件或死锁。另外,我不确定解决方案是否可以变得更简单。我也一直在考虑其他方法,比如
- 巧妙地使用
OperatorDebounceWithSelector - 组合了一个 observable 一次只请求一个项目,
observeOn和onBackpressureBuffer(1)
我也不知道如何为此编写确定性单元测试。 scheduleRec 调度的工作在与TestScheduler 一起使用时不能被中断,我需要使用真正在不同线程上工作的调度程序。我发现很难为多线程代码的竞争条件编写正确的单元测试。
所以,问题仍然存在:我的解决方案是否正确?有没有更简单、更好或更正确的方法呢?以及如何检验它的正确性?
【问题讨论】:
标签: scala reactive-programming rx-java