【问题标题】:How to create an Akka flow with backpressure and Control如何创建具有背压和控制的 Akka 流
【发布时间】:2018-01-17 08:45:00
【问题描述】:

我需要创建一个具有以下接口的函数:

import akka.kafka.scaladsl.Consumer.Control

object ItemConversionFlow {

def build(config: StreamConfig): Flow[Item, OtherItem, Control] = {
    // Implementation goes here
}

我的问题是我不知道如何以适合上述界面的方式定义流程。

当我在做这样的事情时

val flow = Flow[Item]
    .map(item => doConversion(item)
    .filter(_.isDefined)
    .map(_.get)

结果类型是 Flow[Item, OtherItem, NotUsed]。到目前为止,我还没有在 Akka 文档中找到任何内容。此外,akka.stream.scaladsl.Flow 上的功能仅提供“未使用”而不是控制。如果有人能指出我正确的方向,那就太好了。

一些背景:我需要设置几个仅在转换部分区分的管道。这些管道是主流的子流,可能由于某种原因而停止(相应的消息到达某个 kafka 主题)。因此我需要控制部分。我们的想法是创建一个 Graph 模板,我只需在其中插入提到的流作为参数(返回它的工厂)。对于特定情况,我们有一个可行的解决方案。为了概括它,我需要这种流程。

【问题讨论】:

  • 什么是控制,什么时候构造的? (您可以在 mapMaterializedvalue 中构建它)或在自定义 GraphStage 中,或者在使用 GraphDSL 时
  • 我去看看。关于控制:Control = akka.kafka.scaladsl.Consumer.Control @ Viktor Klang

标签: scala akka-stream backpressure


【解决方案1】:

你实际上有背压。然而,想想你真正需要什么背压......你没有使用异步阶段来增加你的吞吐量......例如。背压避免快速生产者过度增长订阅者https://doc.akka.io/docs/akka/2.5/stream/stream-rate.html。在您的示例中不用担心,您的流会根据 doConversion 完成所需的时间向发布者请求新元素。

如果您想获取流的结果,请使用 toMat 或 viaMat。例如,如果您的流发出 Item 并将其转换为 OtherItem:

val str = Source.fromIterator(() => List(Item(Some(1))).toIterator)
  .map(item => doConversion(item))
  .filter(_.isDefined)
  .map(_.get)
  .toMat(Sink.fold(List[OtherItem]())((a, b) => {
      // Examine the result of your stream
      b :: a
    }))(Keep.right)
  .run()

str 将是 Future[List[OtherItem]]。尝试将此推断到您的情况。

或将 toMat 与 KillSwitches 一起使用,“创建 [[FlowShape]] 的新 [[Graph]],它实现为允许外部完成的外部开关 * 那个独特的具体化。不同的实现会产生不同的独立开关。”

  def build(config: StreamConfig): Flow[Item, OtherItem, UniqueKillSwitch] = {
    Flow[Item]
      .map(item => doConversion(item))
      .filter(_.isDefined)
      .map(_.get)
      .viaMat(KillSwitches.single)(Keep.right)
  }


  val stream = 
    Source.fromIterator(() => List(Item(Some(1))).toIterator)
    .viaMat(build(StreamConfig(1)))(Keep.right)
    .toMat(Sink.ignore)(Keep.both).run

  // This stops the stream
  stream._1.shutdown()

  // When it finishes
  stream._2 onComplete(_ => println("Done"))

【讨论】:

  • 这不是我想要的。我真的需要一些东西给我一个满足上述界面的流程。所以我需要一个 Flow[Item, OtherItem, Control] 类型的流。您的解决方案将创建一个 RunnableGraph。我编辑了我的问题,使其更加具体。到目前为止谢谢你
  • 看看有没有帮助
猜你喜欢
  • 1970-01-01
  • 2023-04-04
  • 2020-11-05
  • 2018-05-25
  • 2021-01-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多