【问题标题】:Scala: how to "preload" content of a lazy Iterator?Scala:如何“预加载”惰性迭代器的内容?
【发布时间】:2016-11-30 14:50:37
【问题描述】:

假设我有一个惰性迭代器[Item]。仅当我们迭代迭代器时,才会延迟创建 items 对象。这些物品的制作成本很高。

我想将此迭代器序列化为 JSON 数组。它可以工作(使用 Jackson scala 模块),但在我看来效率不够。

据我了解,目前的工作方式如下:

  • 计算下一项
  • 序列化项目
  • 计算下一项
  • 序列化项目
  • 计算下一项
  • 序列化项目

我希望项目的计算和项目的序列化并行进行。

我想要一个迭代器,它会在读取下一项时开始计算定义数量的下一项。

例如,我希望在执行 iterator.next() 时,在幕后计算接下来的 50 个项目,而不会阻塞迭代线程(它应该只等待下一个可用元素)。

我见过“BufferedIterator”,但它并不是我所需要的,因为我真的不想明确地查询“head”,而且我需要超过 1 个项目来预加载

知道如何实现这一点吗?

我也可以用 Stream 替换 Iterator 的解决方案,但由于内存使用率较低,我更喜欢 Iterator

【问题讨论】:

  • 你试过GroupedIterator吗?
  • @laughedelic 我已经在使用inputIterator.grouped(chunkSize).map(computeItemsChunk).flatten,所以我的项目是按块计算的,但我不确定你的建议是什么
  • 您说“在执行 iterator.next() 时,在幕后,计算接下来的 50 个项目”。这就是GroupedIterator 发生的情况:每次调用.next 时,都会计算一个新块。这不是你想要的吗?
  • @laughedelic 我已经重写了这句话。计算接下来的 50 个项目非常昂贵,我不希望在此期间阻塞迭代线程。计算应该发生在另一个线程中,而迭代线程应该只等待下一个项目而不是整个项目块。你有什么实现建议吗?

标签: scala collections iterator


【解决方案1】:

如果我对您的问题的理解正确,这里有一个示例说明您可以做什么。您可以将每个项目计算包装在Future 中,这样您就可以在不阻塞的情况下迭代输入流,并在每个块准备好后对其进行处理/序列化。我将在 REPL 中执行此操作,并在评估每件作品时打印出来,这样您就可以看到每件事情发生的时间:

@ import concurrent._, ExecutionContext.Implicits.global
import concurrent._, ExecutionContext.Implicits.global

@ def futureItem(i: Int): Future[Int] = Future { 
  Thread.sleep(1000)
  println(s"item: ${i}")
  i 
}
defined function futureItem

@ val inputIterator = (1 to 9).toIterator.map(futureItem)
inputIterator: Iterator[Future[Int]] = non-empty iterator

因此计算每个项目至少需要 1 秒。现在我们要分块处理项目,这也需要一些时间:

@ def computeItemsChunk(items: Seq[Int]): Int = { 
  Thread.sleep(1000)
  val s = items.sum
  println(s"chunk ${items}: ${s}")
  s 
}
defined function computeItemsChunk

现在我们对输入流进行分组,应用Future.sequence 并计算块:

@ case object foo {
  val chunksIterator = inputIterator.grouped(3).map { futureItems => 
    Future.sequence(futureItems).map(computeItemsChunk) 
  }
}
defined object foo

(我在一个对象中定义它,因为否则,分组(或其他东西)将强制评估第一个块)。现在让我们看看它是如何评估的:

@ Await.result(Future.sequence(foo.chunksIterator), Duration.Inf)
item: 2
item: 3
item: 4
item: 1
item: 7
item: 6
chunk List(1, 2, 3): 6
item: 5
item: 8
chunk List(4, 5, 6): 15
item: 9
chunk List(7, 8, 9): 24
res5: Iterator[Int] = non-empty iterator 

您可以看到,一旦项目可用并且迭代器前进而不等待每个块评估,就会计算块。

【讨论】:

  • 您的代码几乎是我已经使用的(除了我在最后将Iterator[Iterator[Item]] 变平。但我过去遇到过这样的实现问题,因为每个项目需要一个线程来处理并且做太多并行工作并溢出系统。所以我并行计算每个项目块,一次一个块,这样最多有chunkSize并行计算。一次做太多工作实际上会延迟计算第一项,以及迭代器的响应时间,以及我的 API。
  • 请注意,我不需要将项目折叠成总和之类的结果(例如,computeItemsChunk 应该采用 Seq[Id] 并返回 Seq[Item])。如果所有计算都开始了,那么在我看来,我真的不再需要 Iterator 或 Stream 的惰性属性,而是有一个 Future[List[Item]] 作为结果可能就足够了
  • 所以一般的想法是我不想一次启动所有计算,并且只有在迭代器消费者进行时才应该触发计算(即不要一次加载所有内容,而是只加载迭代器的 n 下一项,而不是所有项)
  • 好吧,如果您不需要处理项目组(并且您不需要.grouped),您可以简化此操作。但是“太多并行工作”的问题可以通过调整执行上下文来解决。你用的是哪一个?
  • 我仍然需要grouped,因为以块的形式处理我的项目比并行处理它们(批处理)要快。我目前正在使用全球 EC,我想保留它。我必须能够同时生成多个迭代器,并且我想限制迭代器级别(不是全局)的并行处理,但不必为每个生成的迭代器创建一个新的 EC/线程池。产生迭代器并将其传递给消费者的线程永远不应阻塞并等待任何处理,并且在繁忙的 EC 中包装 Future 中的任务会阻塞调用者线程。
猜你喜欢
  • 2012-09-20
  • 1970-01-01
  • 2013-01-02
  • 2015-11-12
  • 2011-01-16
  • 2019-06-20
  • 1970-01-01
  • 2016-04-18
  • 2011-05-29
相关资源
最近更新 更多