【问题标题】:Akka streams change return type of 3rd party flow/stageAkka 流更改第 3 方流/阶段的返回类型
【发布时间】:2017-11-20 15:28:20
【问题描述】:

我有一个从 sqs 读取、写入另一个系统然后从 sqs 删除的图表。为了从 sqs 中删除,我需要 SqsMessage 对象上的收据句柄

在 Http 流的情况下,流的签名允许我说出从流向下游发出的类型,

Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]

在这种情况下,我可以将 T 设置为 SqsMessage,我仍然拥有我需要的所有数据。

但是,一些连接器,例如 google cloud pub sub 会发出一个完全无用的(对我而言)pub sub id。

在 pub 子流的下游,我需要能够访问我在 pub 子流之前拥有的 sqs 消息 ID。

在不重写 pub sub 连接器的情况下解决此问题的最佳方法是什么

我在概念上想要这样的东西:

Flow[SqsMessage] //i have my data at this point
within(
.map(toPubSubMessage)
.via(pubSub))

... from here i have the same type i had before within however it still behaves like a linear graph with back pressure etc

【问题讨论】:

    标签: scala akka-stream akka-http


    【解决方案1】:

    您可以使用 PassThrough 集成模式。 作为用法示例,请查看 akka-streams-kafka -> Class akka.kafka.scaladsl.Producer -> Mehtod def flow[K, V, PassThrough]

    所以只需使用PassThrough 元素实现您自己的Stage,例如akka.kafka.internal.ProducerStage[K, V, PassThrough]

    package my.package
    
    import java.util.concurrent.atomic.AtomicInteger
    
    import scala.concurrent.Future
    import scala.util.{Failure, Success, Try}
    
    import akka.stream._
    import akka.stream.ActorAttributes.SupervisionStrategy
    import akka.stream.stage._
    
    final case class Message[V, PassThrough](record: V, passThrough: PassThrough)
    
    final case class Result[R, PassThrough](result: R, message: PassThrough)
    
    class PathThroughStage[R, V, PassThrough]
      extends GraphStage[FlowShape[Message[V, PassThrough], Future[Result[R, PassThrough]]]] {
    
      private val in = Inlet[Message[V, PassThrough]]("messages")
      private val out = Outlet[Result[R, PassThrough]]("result")
      override val shape = FlowShape(in, out)
    
      override protected def createLogic(inheritedAttributes: Attributes) = {
        val logic = new GraphStageLogic(shape) with StageLogging {
          lazy val decider = inheritedAttributes.get[SupervisionStrategy]
            .map(_.decider)
            .getOrElse(Supervision.stoppingDecider)
          val awaitingConfirmation = new AtomicInteger(0)
          @volatile var inIsClosed = false
    
          var completionState: Option[Try[Unit]] = None
    
          override protected def logSource: Class[_] = classOf[PathThroughStage[R, V, PassThrough]]
    
          def checkForCompletion() = {
            if (isClosed(in) && awaitingConfirmation.get == 0) {
              completionState match {
                case Some(Success(_)) => completeStage()
                case Some(Failure(ex)) => failStage(ex)
                case None => failStage(new IllegalStateException("Stage completed, but there is no info about status"))
              }
            }
          }
    
          val checkForCompletionCB = getAsyncCallback[Unit] { _ =>
            checkForCompletion()
          }
    
          val failStageCb = getAsyncCallback[Throwable] { ex =>
            failStage(ex)
          }
    
          setHandler(out, new OutHandler {
            override def onPull() = {
              tryPull(in)
            }
          })
    
          setHandler(in, new InHandler {
            override def onPush() = {
              val msg = grab(in)
              val f = Future[Result[R, PassThrough]] {
                try {
                  Result(// TODO YOUR logic
                    msg.record,
                    msg.passThrough)
                } catch {
                  case exception: Exception =>
                    decider(exception) match {
                      case Supervision.Stop =>
                        failStageCb.invoke(exception)
                      case _ =>
                        Result(exception, msg.passThrough)
                    }
                }
    
                if (awaitingConfirmation.decrementAndGet() == 0 && inIsClosed) checkForCompletionCB.invoke(())
              }
              awaitingConfirmation.incrementAndGet()
              push(out, f)
            }
    
            override def onUpstreamFinish() = {
              inIsClosed = true
              completionState = Some(Success(()))
              checkForCompletion()
            }
    
            override def onUpstreamFailure(ex: Throwable) = {
              inIsClosed = true
              completionState = Some(Failure(ex))
              checkForCompletion()
            }
          })
    
          override def postStop() = {
            log.debug("Stage completed")
            super.postStop()
          }
        }
        logic
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-02-19
      • 2018-12-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多