【问题标题】:How to implement observeLatestOn in RxJava (RxScala)?如何在 RxJava (RxScala) 中实现 observeLatestOn?
【发布时间】:2015-04-30 12:02:14
【问题描述】:

我正在尝试在 RxJava(实际上是 RxScala)中实现 ObserveLatestOn 运算符。

当我们有一个快速的生产者和一个慢的订阅者时,这个操作符很有用,但订阅者不关心在消费项目时丢失的任何项目。

大理石图:

--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|

= 字符代表订阅者正在执行的长期工作,> 字符代表刚刚完成的工作。作为典型的使用示例,想象一些需要显示的数据的生产者,以及作为订阅者的数据的屏幕渲染器。渲染时间比较长,但是我们不需要在屏幕上渲染每一步,最后一个就完美了。

在上面的弹珠图中,生产者发出信号 1. 订阅者开始处理它,需要很长时间。同时,生产者发出 2 和 3,订阅者完成工作。它看到生产者发出的最后一个项目是 3,所以它开始处理它。速度很快,同时没有新项目产生,所以订阅者可以休息。然后,5 到了,故事以同样的方式继续。

我花了几个小时试图实现这个看似简单的运算符,但我仍然不满意。运算符的本质表明它应该是异步的,它应该在与接收它们不同的调度程序上发出其项目。但同时,我当然不希望一个线程被一个worker占用而没有工作要做。

这是我目前想出的:

def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
  @volatile var maybeNextItem: Option[Notification[T]] = None
  @volatile var isWorkScheduled = false
  val itemsQueueLock = new Object()

  Observable(subscriber ⇒ {
    def signalToSubscriber(materializedItem: Notification[T]): Unit = {
      materializedItem match {
        case OnNext(item) ⇒ subscriber onNext item
        case OnError(error) ⇒ subscriber onError error
        case OnCompleted ⇒ subscriber.onCompleted()
      }
    }

    def queueItem(item: Notification[T]): Unit = {
      val worker = scheduler.createWorker

      val shouldScheduleWork = itemsQueueLock synchronized {
        val result = !isWorkScheduled
        maybeNextItem = Some(item)
        isWorkScheduled = true
        result
      }

      if (shouldScheduleWork) {
        worker.scheduleRec {
          val maybeNextItemToSignal = itemsQueueLock synchronized {
            val result = maybeNextItem
            if (result.isEmpty) {
              worker.unsubscribe()
              isWorkScheduled = false
            }
            maybeNextItem = None
            result
          }

          maybeNextItemToSignal foreach signalToSubscriber
        }
      }
    }

    o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
      next ⇒ queueItem(OnNext(next)),
      error ⇒ queueItem(OnError(error)),
      () ⇒ queueItem(OnCompleted)
    )
  })
}

它似乎有效,但我不相信没有竞争条件或死锁。另外,我不确定解决方案是否可以变得更简单。我也一直在考虑其他方法,比如

  • 巧妙地使用OperatorDebounceWithSelector
  • 组合了一个 observable 一次只请求一个项目,observeOnonBackpressureBuffer(1)

我也不知道如何为此编写确定性单元测试。 scheduleRec 调度的工作在与TestScheduler 一起使用时不能被中断,我需要使用真正在不同线程上工作的调度程序。我发现很难为多线程代码的竞争条件编写正确的单元测试。

所以,问题仍然存在:我的解决方案是否正确?有没有更简单、更好或更正确的方法呢?以及如何检验它的正确性?

【问题讨论】:

    标签: scala reactive-programming rx-java


    【解决方案1】:

    我推荐使用lift 来实现这个操作符。这是我的解决方案:

    package object ObservableEx {
    
      implicit class ObserveLatestOn[T](val o: Observable[T]) {
    
        def observeLatestOn(scheduler: Scheduler): Observable[T] = {
          o.lift { (child: Subscriber[T]) =>
            val worker = scheduler.createWorker
            child.add(worker)
    
            val parent = new Subscriber[T] {
    
              private val lock = new AnyRef
    
              // protected by "lock"
              private var latest: Notification[T] = null
              // protected by "lock"
              // Means no task runs in the worker
              private var idle = true
    
              private var done = false
    
              override def onStart(): Unit = {
                request(Long.MaxValue)
              }
    
              override def onNext(v: T): Unit = {
                if (!done) {
                  emit(OnNext(v))
                }
              }
    
              override def onCompleted(): Unit = {
                if (!done) {
                  done = true
                  emit(OnCompleted)
                }
              }
    
              override def onError(e: Throwable): Unit = {
                if (!done) {
                  done = true
                  emit(OnError(e))
                }
              }
    
              def emit(v: Notification[T]): Unit = {
                var shouldSchedule = false
                lock.synchronized {
                  latest = v
                  if (idle) {
                    // worker is idle so we should schedule a task
                    shouldSchedule = true
                    // We will schedule a task, so the worker will be busy
                    idle = false
                  }
                }
                if (shouldSchedule) {
                  worker.schedule {
                    var n: Notification[T] = null
                    var exit = false
                    while (!exit) {
                      lock.synchronized {
                        if (latest == null) {
                          // No new item arrives and we are leaving the worker, so set "idle"
                          idle = true
                          exit = true
                        } else {
                          n = latest
                          latest = null
                        }
                      }
                      if (!exit) {
                        n.accept(child)
                      }
                    }
                  }
                }
              }
            }
    
            child.add(parent)
    
            parent
          }
        }
      }
    
    }
    

    还有一个单元测试

    import ObservableEx.ObserveLatestOn
    
    @Test
    def testObserveLatestOn(): Unit = {
      val scheduler = TestScheduler()
      val xs = mutable.ArrayBuffer[Long]()
      var completed = false
      Observable.interval(100 milliseconds, scheduler).take(10).observeLatestOn(scheduler).subscribe(v => {
        scheduler.advanceTimeBy(200 milliseconds)
        xs += v
      },
        e => e.printStackTrace(),
        () => completed = true
      )
      scheduler.advanceTimeBy(100 milliseconds)
      assert(completed === true)
      assert(xs === List(0, 2, 4, 6, 8))
    }
    

    【讨论】:

    • 对我来说看起来不错。所有标志都是本地的或在同步部分中使用,因此它可能应该是无竞争条件的。很高兴了解liftNotification.accept。我会用你的版本,谢谢。
    【解决方案2】:

    我有一个PR,其中运算符onBackpressureLatest() 应该具有预期的行为,但您需要并发并且可以照常使用observeOn

    【讨论】:

    • 是的,我也在考虑类似的事情。我想您必须将它与observeOn 和一些requests 一次只有一项的运算符结合起来才能实现我正在寻找的行为?无论如何,至少在onBackpressureLatest 在 RxScala 版本中之前,我可能会使用 zsxswing 的解决方案。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多