【问题标题】:Custom Akka Streams自定义 Akka 流
【发布时间】:2018-11-25 05:46:21
【问题描述】:

我有一组流阶段(源、流和汇),我想向其中添加一些元数据信息。

因此而不是 Sources 产生 A -> (A, StreamMetaData)。我已经设法使用自定义流阶段来做到这一点,其中在抓取(入)元素时,我推送(出,(elem,StreamMetaData))。实际上,它不是“转换”现有源,而是将其传递给流以重新创建新源。

现在我正在尝试实现以下 MetaStream 阶段:

因此,鉴于源正在生成 (A, StreamMetaData) 的元组,我想将 A 传递给现有流以进行一些计算,然后将生成的输出“B”与 StreamMetaData 合并。然后这些将被传递给接受(B,StreamMetaData)的接收器。

你建议我怎么做。我被告知部分图表是最好的选择,并且有助于完成这样的任务。 UniformFanOut 和 UniformFanIn 使用 Unzip((A streamMetaData), A, StreamMetaData) 和 Zip(A,B)

 val fanOut = GraphDSL.create() { implicit b =>
    val unzip = b.add(Unzip[T, StreamMetaData]) 
    UniformFanOutShape(unzip.in, unzip.out0, unzip.out1)
  }

  val fanIn = GraphDSL.create() { implicit b =>
    val zip = b.add(Zip[T ,StreamMetaData]()) 
    UniformFanInShape(zip)
  }

如何连接 fanIn 和 fanOut 以实现与图片中相同的行为?

我有这样的想法;

 def metaFlow[T, B, Mat](flow: Flow[T, B, Mat]): Unit = {
   val wrappedFlow =
     Flow.fromGraph(GraphDSL.create(){ implicit b =>
       import GraphDSL.Implicits._

       val unzip: FanOutShape2[(T, StreamMetaData), T, StreamMetaData] = b.add(Unzip[T, StreamMetaData])
       val existingFlow = b.add(flow)
       val zip: FanInShape2[B,StreamMetaData,(B,StreamMetaData)] = b.add(Zip[B, StreamMetaData])

       unzip.out0 ~> existingFlow ~> zip.in0
       unzip.out1 ~> zip.in1

       FlowShape(unzip.in, zip.out)
     })

 }

提前致谢。

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    这个 aprox 创建一个新的 SourceShape 堆叠流程图可以工作,与您的 flowShape 实现有点不同。

      def sourceGraph[A, B](f: A => B, source: Source[(A, StreamMetaData), NotUsed]) = Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
        import GraphDSL.Implicits._
    
        val unzip = builder.add(Unzip[A, StreamMetaData]())
        val zip   = builder.add(Zip[B, StreamMetaData]())
    
        val flow0  = builder.add(Flow[A].map { f(_) })
    
        val flow1 = source ~> unzip.in
                              unzip.out0 ~> flow0 ~> zip.in0
                              unzip.out1          ~> zip.in1
    
    
        SourceShape(zip.out)
      })
    
    
    
    def flowGraph[A, B](f: A => B) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
        import GraphDSL.Implicits._
    
        val unzip = builder.add(Unzip[A, StreamMetaData]())
        val zip   = builder.add(Zip[B, StreamMetaData]())
    
        val flow0  = builder.add(Flow[A].map { f(_) })
    
        unzip.out0 ~> flow0 ~> zip.in0
        unzip.out1          ~> zip.in1
    
        FlowShape(unzip.in, zip.out)
      })
    

    【讨论】:

    • 虽然这可能有效,但我更愿意将 Flow 阶段保持为流。尽管如此,我想要实现的目标还是比较相似的。
    • 这正是我要找的!在 flowGraph 阶段之后,我需要将结果元组传递给另一个流,该流读取 StreamMetaData 并将其转换为其他值以及其他一些计算。您是否建议使用逻辑创建自定义 GraphStage?
    • 我已经完成了自定义源阶段。这需要你很好地了解流的行为,即当接收器需要更多元素时,否则你可以尝试奇怪的行为。所以,如果 Akka 提供了自定义阶段,我会避免构建它们。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-12-22
    • 2017-11-27
    • 1970-01-01
    • 1970-01-01
    • 2017-01-30
    相关资源
    最近更新 更多