【问题标题】:Concatinating two Flows in Akka stream在 Akka 流中连接两个流
【发布时间】:2021-12-08 11:27:14
【问题描述】:

我正在尝试连接两个流,但我无法解释我的实现的输出。

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

我希望这段代码有以下输出。

2
3
4
.
.
.
11
10
20
.
.
.
100

相反,我看到只打印了“2”。您能否解释一下我的实现有什么问题以及我应该如何更改程序以获得所需的输出。

【问题讨论】:

  • source.via(flow1).concat(source.via(flow2))。请注意,您 正在 重新运行 source 两次。

标签: scala akka akka-stream


【解决方案1】:

来自 Akka Stream 的 API 文档:

Concat:

当当前流有可用元素时发出;如果当前输入完成,它会尝试下一个输入

Broadcast:

当所有输出停止反压并且有可用的输入元素时发出

这两个运算符不能一起工作,因为它们的工作方式存在冲突——Concat 尝试在切换到另一个输出之前从Broadcast 的一个输出中提取所有元素,而@987654329 @ 不会发出,除非它的所有输出都有需求。

根据您的需要,您可以按照评论者的建议使用concat 进行连接:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

或等效地,使用Source.combine,如下所示:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)

【讨论】:

    【解决方案2】:

    使用GraphDSL,这是Source.combine的简化版实现:

    val sg = Source.fromGraph(
      GraphDSL.create(){ implicit builder =>
        import GraphDSL.Implicits._
    
        val concat = builder.add(Concat[Int](2))
    
        source ~> flow1 ~> concat
        source ~> flow2 ~> concat
    
        SourceShape(concat.out)
      }
    )
    
    sg.runWith(sink)
    

    【讨论】:

      猜你喜欢
      • 2017-07-17
      • 1970-01-01
      • 2013-05-02
      • 1970-01-01
      • 2015-04-06
      • 1970-01-01
      • 2017-09-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多