【问题标题】:Asynchronous Iterable over remote data远程数据的异步迭代
【发布时间】:2015-08-24 14:52:58
【问题描述】:

我从远程 API 中提取了一些数据,为此我使用了 Future 风格的接口。数据结构为链表。下面显示了一个相关的示例数据容器。

case class Data(information: Int) {
    def hasNext: Boolean = ??? // Implemented
    def next: Future[Data] = ??? // Implemented
}

现在我有兴趣向数据类添加一些功能,例如mapforeachreduce 等。为此,我想实现某种形式的IterableLike,使其继承这些方法。 下面给出的是 Data 可以扩展的特征,以便它获取此属性。

trait AsyncIterable[+T]
    extends IterableLike[Future[T], AsyncIterable[T]]
{
    def hasNext : Boolean
    def next : Future[T]

    // How to implement?
    override def iterator: Iterator[Future[T]] = ???
    override protected[this] def newBuilder: mutable.Builder[Future[T], AsyncIterable[T]] = ???
    override def seq: TraversableOnce[Future[T]] = ???
}

它应该是一个非阻塞实现,当它被执行时,它开始从远程数据源请求下一个数据。 然后可以做一些很酷的事情,例如

case class Data(information: Int) extends AsyncIterable[Data]
val data = Data(1) // And more, of course
// Asynchronously print all the information.
data.foreach(data => println(data.information))

界面不同也是可以接受的。但是结果应该以某种方式表示对集合的异步迭代。最好采用开发人员熟悉的方式,因为它将成为(开源)库的一部分。

【问题讨论】:

  • 你真的要坚持Iterator界面吗?您可以考虑使用(或实现)类似于Rx Observable 的东西。实际上 async Observable 是 dual 来同步 Iterable。
  • 是的,这很好,它不必是这个特定的接口。唯一的要求是集合上的接口是异步的。然而,它不是一个自己产生信息的数据源,它不应该在没有订阅者请求的情况下开始产生事件。
  • 你可能想看看 scalaz。您也许可以使用ListT[Future, Data] 做一些事情
  • 看看推特的Spool,基本上是Stream的异步版本。
  • @TravisBrown 我已经能够在 Spool 上实现一些看起来不错的东西,但是 Spool 中使用的 Futures 属于 Twitter 类型(即com.twitter.util.Future),它与 Scala Futures 不能很好地结合。我已经采用了 stackoverflow.com/questions/30317473/… 发布的隐式转换,但是有没有计划将其更改为 scala 期货?

标签: scala future scala-collections


【解决方案1】:

在生产中我会使用以下之一:

  1. Akka Streams
  2. Reactive Extensions

对于私人测试,我将实现类似于以下的内容。 (说明如下)

我修改了一点你的Data:

abstract class AsyncIterator[T] extends Iterator[Future[T]] {
  def hasNext: Boolean
  def next(): Future[T]
}

为此我们可以实现这个Iterable

class AsyncIterable[T](sourceIterator: AsyncIterator[T])
  extends IterableLike[Future[T], AsyncIterable[T]]
{
  private def stream(): Stream[Future[T]] =
    if(sourceIterator.hasNext) {sourceIterator.next #:: stream()} else {Stream.empty}
  val asStream = stream()

  override def iterator = asStream.iterator
  override def seq = asStream.seq
  override protected[this] def newBuilder = throw new UnsupportedOperationException()
}

如果使用以下代码查看它的实际效果:

object Example extends App {
  val source = "Hello World!";

  val iterator1 = new DelayedIterator[Char](100L, source.toCharArray)
  new AsyncIterable(iterator1).foreach(_.foreach(print)) //prints 1 char per 100 ms
  pause(2000L)

  val iterator2 = new DelayedIterator[String](100L, source.toCharArray.map(_.toString))
  new AsyncIterable(iterator2).reduceLeft((fl: Future[String], fr) =>
    for(l <- fl; r <- fr) yield {println(s"$l+$r"); l + r}) //prints 1 line per 100 ms
  pause(2000L)

  def pause(duration: Long) = {println("->"); Thread.sleep(duration); println("\n<-")}
}

class DelayedIterator[T](delay: Long, data: Seq[T]) extends AsyncIterator[T] {
  private val dataIterator = data.iterator
  private var nextTime = System.currentTimeMillis() + delay
  override def hasNext = dataIterator.hasNext
  override def next = {
    val thisTime = math.max(System.currentTimeMillis(), nextTime)
    val thisValue = dataIterator.next()
    nextTime = thisTime + delay
    Future {
      val now = System.currentTimeMillis()
      if(thisTime > now) Thread.sleep(thisTime - now) //Your implementation will be better
      thisValue
    }
  }
}

说明

AsyncIterable 使用 Stream 是因为它是惰性计算的,而且很简单。

优点:

  • 简单
  • 多次调用iteratorseq 方法返回与所有项目相同的可迭代对象。

缺点:

  • 可能会导致内存溢出,因为流会保留所有先前获得的值。
  • AsyncIterable的创建过程中急切地获得第一个值

DelayedIterator 是非常简单的 AsyncIterator 实现,不要怪我这里的代码又快又脏。

看到同步hasNext和异步next()我还是觉得奇怪

【讨论】:

  • 感谢您的评论。我认为您假设 Data 是某种形式的迭代器,但 Data 是类似单链表的结构的元素,具有指向下一个异步元素的指针。因此hasNextnext 应该在next 返回的Data 上调用(在Future 完成之后)。我没听错吗?
  • @Caavoow 完全正确,我是这么理解的。现在我看到 Data 是一个链表。我会更新我的答案,但可能需要几天时间。
  • 没问题,我对使用 Streams 的解决方案非常感兴趣。您能否详细说明为什么要使用 Akka Streams 而不是标准库 Stream?我也想到了 Rx,但输入不是连续的输入流,而是基于拉取的。我必须从服务器请求下一个Data。顺便说一句,数据总量是有限的,所以把它全部存储在内存中是没有问题的。
  • 您的 Twitter Spool 解决方案看起来不错,我认为没有理由与之竞争。我会将我的答案作为 AsyncIterator 的示例。
  • 在下面使用scala.collection.immutable.Stream 的问题是它会记住它看到的所有元素。所以整个迭代将存储在内存中。在这种情况下,不妨使用Future[Seq[A]]
【解决方案2】:

我使用 Twitter Spool 实现了一个工作示例。 为了实现spool,我修改了documentation中的示例。

import com.twitter.concurrent.Spool
import com.twitter.util.{Await, Return, Promise}

import scala.concurrent.{ExecutionContext, Future}

trait AsyncIterable[+T <: AsyncIterable[T]] { self : T =>
    def hasNext : Boolean
    def next : Future[T]

    def spool(implicit ec: ExecutionContext) : Spool[T] = {
        def fill(currentPage: Future[T], rest: Promise[Spool[T]]) {
            currentPage foreach { cPage =>
                if(hasNext) {
                    val nextSpool = new Promise[Spool[T]]
                    rest() = Return(cPage *:: nextSpool)
                    fill(next, nextSpool)
                } else {
                    val emptySpool = new Promise[Spool[T]]
                    emptySpool() = Return(Spool.empty[T])
                    rest() = Return(cPage *:: emptySpool)
                }
            }
        }
        val rest = new Promise[Spool[T]]
        if(hasNext) {
            fill(next, rest)
        } else {
            rest() = Return(Spool.empty[T])
        }
        self *:: rest
    }
}

数据和以前一样,现在我们可以使用它了。

// Cool stuff
implicit val ec = scala.concurrent.ExecutionContext.global
val data = Data(1) // And others
// Print all the information asynchronously
val fut = data.spool.foreach(data => println(data.information))
Await.ready(fut)

它将在第二个元素上引发异常,因为未提供 next 的实现。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-08-28
    • 2016-07-10
    • 1970-01-01
    • 2014-02-04
    • 2016-03-17
    • 2021-10-22
    • 1970-01-01
    相关资源
    最近更新 更多