【问题标题】:Scala RestartSink FutureScala RestartSink 未来
【发布时间】:2018-06-17 01:52:03
【问题描述】:

我正在尝试重新创建 Scala 的 [RestartSink][1] 功能的类似功能。

我想出了这段代码。但是,由于我们只返回 SinkShape 而不是 Sink,因此我无法指定它应该返回 Future[Done] 而不是 NotUsed,因为它是物化类型。但是,我对如何做到这一点感到困惑。我只能让它返回[MessageActionPair, NotUsed] 而不是所需的[MessageActionPair, Future[Done]]。我仍在学习围绕这个框架的方法,所以我确定我错过了一些小东西。我试过打电话给Source.toMat(RestartWithBackoffSink...),但是也没有得到想要的结果。

private final class RestartWithBackoffSink(
                                               sourcePool:     Seq[SqsEndpoint],
                                               minBackoff:   FiniteDuration,
                                               maxBackoff:   FiniteDuration,
                                               randomFactor: Double) extends GraphStage[SinkShape[MessageActionPair]] { self ⇒

  val in = Inlet[MessageActionPair]("RoundRobinRestartWithBackoffSink.in")

  override def shape = SinkShape(in)
  override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
    "Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) {
    override protected def logSource = self.getClass

    override protected def startGraph() = {
      val sourceOut = createSubOutlet(in)
      Source.fromGraph(sourceOut.source).runWith(createSink(getEndpoint))(subFusingMaterializer)
    }

    override protected def backoff() = {
      setHandler(in, new InHandler {
        override def onPush() = ()
      })
    }

    private def createSink(endpoint: SqsEndpoint): Sink[MessageActionPair, Future[Done]] = {
      SqsAckSink(endpoint.queue.url)(endpoint.client)
    }

    def getEndpoint: SqsEndpoint = {
      if(isTimedOut) {
        index = (index + 1) % sourcePool.length
        restartCount = 0
      }
      sourcePool(index)
    }

    backoff()
  }
}

这里有语法错误,因为类型不匹配:

def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourcePool: Seq[SqsEndpoint]): Sink[MessageActionPair, Future[Done]] = {
    Sink.fromGraph(new RestartWithBackoffSink(sourcePool, minBackoff, maxBackoff, randomFactor))
  }

【问题讨论】:

    标签: scala akka reactive-programming akka-stream


    【解决方案1】:

    通过扩展extends GraphStage[SinkShape[MessageActionPair]],您定义了一个没有具体化价值的阶段。或者更好地定义一个实现为NotUsed 的阶段。

    你必须决定你的舞台是否可以变成任何有意义的东西。更多关于阶段的具体化值here

    如果是这样:您必须扩展 GraphStageWithMaterializedValue[SinkShape[MessageActionPair], Future[Done]] 并正确覆盖 createLogicAndMaterializedValue 函数。更多指导可以在docs找到。

    如果没有:您可以按以下方式更改类型

    def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourcePool: Seq[SqsEndpoint]): Sink[MessageActionPair, NotUsed] = {
        Sink.fromGraph(new RestartWithBackoffSink(sourcePool, minBackoff, maxBackoff, randomFactor))
      }
    

    【讨论】:

    • 扩展和覆盖createLogicAndMaterializedValue 做到了。谢谢
    猜你喜欢
    • 2018-02-14
    • 2013-08-04
    • 1970-01-01
    • 2016-02-06
    • 2017-06-24
    • 1970-01-01
    • 2014-04-15
    • 2012-08-14
    • 1970-01-01
    相关资源
    最近更新 更多