【问题标题】:how to get items from a Sink in the order they were provided in Source?如何按照 Source 中提供的顺序从 Sink 获取项目?
【发布时间】:2017-02-21 20:16:55
【问题描述】:

我有这样的来源:

val queueP : Promise[SourceQueueWithComplete[List[String]]] = Promise()
val source = Source.queue(Constants.CHUNK_SIZE, OverflowStrategy.backpressure).mapMaterializedValue {
      q : SourceQueueWithComplete[List[String]] => {
        queueP.success(q)
        q
      }
    }.watchTermination() {
      case (_,f) => f.recoverWith {
        case t : Exception => {
          queueP.tryFailure(new Exception)
          Future.failed(t)
        }
      }
    }

我将物品提供给:

queueP.future.map(f => f.offer(someList))

但是当我沉没在另一端时:

val sink = Sink.foreach[List[String]](someList => {
      ...
    })

    val flow = rowsSource.to(sink)
    flow.run

我收到的物品是乱序的,这首先违背了排队的目的。有没有办法强制物品按照提供给队列的顺序排列?

【问题讨论】:

  • 您确定rowsSourcesource 相同吗?你能添加更多代码来更清楚地说明这个问题吗?
  • @StefanoBonetti 我敢肯定 - 我想我的问题是,他们应该按顺序排列吗?我的理解是,根据我找到的信息,这些项目可以以任何顺序出现(只要期货完成)。但必须有一种方法可以强制执行原始顺序。
  • 从您的代码看来,您只向 Source.queue 提供了 一个 元素,即List[String]。我的结论是,每当接收器收到它时,它仍然会以相同的顺序包含元素。或者您是否为该具体化队列提供了多个元素?
  • @StefanoBonetti 断章取义,报价在循环中。我提供了很多清单。

标签: scala akka akka-stream


【解决方案1】:

因为您 offer 您的元素是您的 mapMaterializedValue 调用的一部分,所以您实际上需要在每次要提交元素时实现(即运行)您的 Source.queue

作为副作用,您的元素会乱序,因为每个流实现都是异步发生的。

解决问题的更健康的方法是运行单个图表,获取单个队列,并向其提交多个元素。请参见下面的代码示例:

val queue: SourceQueueWithComplete[List[String]] = 
  Source.queue[List[String]](Constants.CHUNK_SIZE, OverflowStrategy.backpressure)
    .to(Sink.foreach { list ⇒ /* do stuff */ })
    .run()

queue.offer(List("a", "b"))  
queue.offer(List("c", "d"))  

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-08
    • 1970-01-01
    相关资源
    最近更新 更多