【问题标题】:Traversing lists and streams with a function returning a future使用返回未来的函数遍历列表和流
【发布时间】:2013-08-12 09:17:33
【问题描述】:

简介

Scala的Futurenew in 2.10now 2.9.3)是一个应用函子,这意味着如果我们有一个traversable typeF,我们可以取一个F[A]和一个函数A => Future[B]然后转他们变成了Future[F[B]]

此操作在标准库中作为Future.traverse 可用。 Scalaz 7 还提供了一个更通用的traverse,如果我们从scalaz-contrib library 导入Future 的应用函子实例,我们可以在这里使用它。

这两个traverse 方法在流的情况下表现不同。标准库遍历在返回之前消耗流,而Scalaz的returns the future immediately

import scala.concurrent._
import ExecutionContext.Implicits.global

// Hangs.
val standardRes = Future.traverse(Stream.from(1))(future(_))

// Returns immediately.
val scalazRes = Stream.from(1).traverse(future(_))

还有另一个区别,正如Leif Warner 观察到的here。标准库的 traverse 立即启动所有异步操作,而 Scalaz 启动第一个,等待它完成,启动第二个,等待它,等等。

流的不同行为

很容易通过编写一个函数来显示第二个差异,该函数将为流中的第一个值休眠几秒钟:

def howLong(i: Int) = if (i == 1) 10000 else 0

import scalaz._, Scalaz._
import scalaz.contrib.std._

def toFuture(i: Int)(implicit ec: ExecutionContext) = future {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

现在Future.traverse(Stream(1, 2))(toFuture) 将打印以下内容:

Starting 1!
Starting 2!
Done 2!
Done 1!

还有 Scalaz 版本 (Stream(1, 2).traverse(toFuture)):

Starting 1!
Done 1!
Starting 2!
Done 2!

这可能不是我们想要的。

对于列表呢?

奇怪的是,在这方面,两个遍历在列表上的行为是相同的——Scalaz 不会在开始下一个未来之前等待一个未来完成。

另一个未来

Scalaz 还包含自己的 concurrent 包和自己的期货实现。我们可以使用与上面相同的设置:

import scalaz.concurrent.{ Future => FutureZ, _ }

def toFutureZ(i: Int) = FutureZ {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

然后我们得到 Scalaz 在流 for 列表 以及流上的行为:

Starting 1!
Done 1!
Starting 2!
Done 2!

也许不那么令人惊讶的是,遍历无限流仍然会立即返回。

问题

此时我们确实需要一个表格来总结,但必须要做一个列表:

  • 带有标准库遍历的流:在返回前消耗;不要等待每个未来。
  • 带有 Scalaz 遍历的流:立即返回;请等待每个未来完成。
  • 带有流的 Scalaz 期货:立即返回;请等待每个未来完成。

还有:

  • 具有标准库遍历的列表:不要等待。
  • 带有 Scalaz 遍历的列表:不要等待。
  • 带有列表的 Scalaz 期货:请等待每个未来完成。

这有意义吗?列表和流上的此操作是否存在“正确”行为? “最异步”的行为——即在返回之前不消耗集合,并且不等待每个未来完成后再继续下一个——是否有某种原因在这里没有表现出来?

【问题讨论】:

  • 在“最佳”情况下,Future.traverse 在流上必须返回一个在请求时创建的 Futures 流(意味着在输出请求时从输入中延迟读取元素)。虽然肯定有可能,但实施起来比较困难。
  • @soulcheck:在这种情况下,traverse 返回一个Future[Stream[B]]——这部分没有争议。问题是语义应该是什么。
  • 你是对的,没有正确阅读。我也知道你现在的疑虑来自哪里。
  • 你查看过scala的遍历源吗?该死的,那是some ugly scala code。尤其是for :)
  • 相关问题和机器学习链接stackoverflow.com/a/17183164/1296806

标签: scala concurrency future scalaz applicative


【解决方案1】:

我无法全部回答,但我尝试了一些部分:

“最异步”的行为是否有某些原因? 在返回之前消耗集合,并且不要等待每个 在继续下一个之前完成的未来 - 没有表示 在这里?

如果您有依赖计算和有限数量的线程,您可能会遇到死锁。例如,您有两个期货取决于第三个(期货列表中的所有三个)并且只有两个线程,您可能会遇到前两个期货阻塞所有两个线程而第三个永远不会执行的情况。 (当然,如果你的池大小是一个,即zou一个接一个地执行计算,你可以得到类似的情况)

要解决这个问题,你需要一个线程每个未来,没有任何限制。这适用于小型期货清单,但不适用于大型期货。所以如果你并行运行,你会遇到这样一种情况,小例子在所有情况下都会运行,而更大的例子会死锁。 (示例:开发人员测试运行良好,生产出现死锁)。

此操作对列表和流有“正确”行为吗?

我认为期货是不可能的。如果您了解更多的依赖关系,或者当您确定计算不会阻塞时,可能会有更多并发的解决方案。但是执行期货清单看起来我“被设计破坏了”。最好的解决方案似乎是一个,对于死锁的小例子(即一个接一个地执行一个 Future),它已经失败了。

带有列表的 Scalaz 期货:请等待每个未来完成。

我认为 scalaz 在内部使用 for 理解来进行遍历。对于理解,不能保证计算是独立的。所以我猜 Scalaz 在这里对推导做正确的事情:一个接一个地进行计算。在期货的情况下,这将始终有效,因为您的操作系统中有无限的线程。

换句话说:你看到的只是一个关于理解(必须)如何工作的工件。

我希望这有点道理。

【讨论】:

    【解决方案2】:

    如果我正确理解了这个问题,我认为这真的归结为流与列表的语义。

    遍历列表符合我们对文档的期望:

    使用提供的函数A => Future[B]TraversableOnce[A] 转换为Future[TraversableOnce[B]]。这对于执行并行映射很有用。例如,将函数并行应用于列表的所有项目:

    对于流,由开发人员决定他们希望它如何工作,因为它依赖于比编译器更多的流知识(流可以是无限的,但类型系统不知道它)。如果我的流正在从文件中读取行,我想先使用它,因为逐行链接期货实际上不会并行化事情。在这种情况下,我想要并行方法。

    另一方面,如果我的流是一个无限列表,生成连续整数并寻找大于某个大数的第一个素数,则不可能在一次扫描中首先消耗流(链式Future 方法将是必需的,我们可能希望从流中运行批次)。

    与其试图找出一种规范的方法来处理这个问题,我想知道是否缺少有助于使不同情况更明确的类型。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多