【发布时间】:2017-08-30 19:08:36
【问题描述】:
假设我们有一个假数据源,它将批量返回它保存的数据
class DataSource(size: Int) {
private var s = 0
implicit val g = scala.concurrent.ExecutionContext.global
def getData(): Future[List[Int]] = {
s = s + 1
Future {
Thread.sleep(Random.nextInt(s * 100))
if (s <= size) {
List.fill(100)(s)
} else {
List()
}
}
}
object Test extends App {
val source = new DataSource(100)
implicit val g = scala.concurrent.ExecutionContext.global
def process(v: List[Int]): Unit = {
println(v)
}
def next(f: (List[Int]) => Unit): Unit = {
val fut = source.getData()
fut.onComplete {
case Success(v) => {
f(v)
v match {
case h :: t => next(f)
}
}
}
}
next(process)
Thread.sleep(1000000000)
}
我有我的,这里的问题是有些部分更不纯净。理想情况下,我想将每个批次的 Future 包装成一个大的未来,并且当最后一批返回 0 大小列表时,包装器未来是否成功?我的情况有点来自this 的帖子,next() 有同步调用,而我的也是异步的。
或者有没有可能做我想做的事?下一批只有在上一个最终解决后才会获取下一批是否获取下一批取决于返回的大小?
浏览此类数据源的最佳方法是什么?是否有任何现有的 Scala 框架提供我正在寻找的功能? play 的 Iteratee, Enumerator, Enumeratee 是正确的工具吗?如果是这样,任何人都可以提供一个示例来说明如何使用这些设施来实现我正在寻找的东西吗?
编辑---- 在 chunjef 的帮助下,我刚刚尝试过。它确实对我有用。不过,我根据他的回答做了一些小改动。
Source.fromIterator(()=>Iterator.continually(source.getData())).mapAsync(1) (f=>f.filter(_.size > 0))
.via(Flow[List[Int]].takeWhile(_.nonEmpty))
.runForeach(println)
但是,有人可以比较 Akka Stream 和 Play Iteratee 吗?是否值得我也试试 Iteratee?
代码片段 1:
Source.fromIterator(() => Iterator.continually(ds.getData)) // line 1
.mapAsync(1)(identity) // line 2
.takeWhile(_.nonEmpty) // line 3
.runForeach(println) // line 4
代码片段 2:假设 getData 依赖于另一个流的其他输出,我想将它与以下流连接起来。但是,它会产生太多文件打开错误。不确定是什么导致了这个错误,如果我理解正确的话,mapAsync 的吞吐量被限制为 1。
Flow[Int].mapConcat[Future[List[Int]]](c => {
Iterator.continually(ds.getData(c)).to[collection.immutable.Iterable]
}).mapAsync(1)(identity).takeWhile(_.nonEmpty).runForeach(println)
【问题讨论】:
-
该代码有效,对吧?
-
有点,是的。我正在寻找更好的版本。我粘贴的代码我不认为太 scala 风格。 :)
-
使用
Iteratee或 Akka Stream,而不是重新发明轮子 -
@cchantep 这就是我想知道的,怎么做?官方文档不好理解。
标签: scala