【发布时间】:2017-02-21 20:16:55
【问题描述】:
我有这样的来源:
val queueP : Promise[SourceQueueWithComplete[List[String]]] = Promise()
val source = Source.queue(Constants.CHUNK_SIZE, OverflowStrategy.backpressure).mapMaterializedValue {
q : SourceQueueWithComplete[List[String]] => {
queueP.success(q)
q
}
}.watchTermination() {
case (_,f) => f.recoverWith {
case t : Exception => {
queueP.tryFailure(new Exception)
Future.failed(t)
}
}
}
我将物品提供给:
queueP.future.map(f => f.offer(someList))
但是当我沉没在另一端时:
val sink = Sink.foreach[List[String]](someList => {
...
})
val flow = rowsSource.to(sink)
flow.run
我收到的物品是乱序的,这首先违背了排队的目的。有没有办法强制物品按照提供给队列的顺序排列?
【问题讨论】:
-
您确定
rowsSource与source相同吗?你能添加更多代码来更清楚地说明这个问题吗? -
@StefanoBonetti 我敢肯定 - 我想我的问题是,他们应该按顺序排列吗?我的理解是,根据我找到的信息,这些项目可以以任何顺序出现(只要期货完成)。但必须有一种方法可以强制执行原始顺序。
-
从您的代码看来,您只向 Source.queue 提供了 一个 元素,即
List[String]。我的结论是,每当接收器收到它时,它仍然会以相同的顺序包含元素。或者您是否为该具体化队列提供了多个元素? -
@StefanoBonetti 断章取义,报价在循环中。我提供了很多清单。
标签: scala akka akka-stream