您可以使用 PassThrough 集成模式。
作为用法示例,请查看 akka-streams-kafka -> Class akka.kafka.scaladsl.Producer -> Mehtod def flow[K, V, PassThrough]
所以只需使用PassThrough 元素实现您自己的Stage,例如akka.kafka.internal.ProducerStage[K, V, PassThrough]
package my.package
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
import akka.stream._
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.stage._
final case class Message[V, PassThrough](record: V, passThrough: PassThrough)
final case class Result[R, PassThrough](result: R, message: PassThrough)
class PathThroughStage[R, V, PassThrough]
extends GraphStage[FlowShape[Message[V, PassThrough], Future[Result[R, PassThrough]]]] {
private val in = Inlet[Message[V, PassThrough]]("messages")
private val out = Outlet[Result[R, PassThrough]]("result")
override val shape = FlowShape(in, out)
override protected def createLogic(inheritedAttributes: Attributes) = {
val logic = new GraphStageLogic(shape) with StageLogging {
lazy val decider = inheritedAttributes.get[SupervisionStrategy]
.map(_.decider)
.getOrElse(Supervision.stoppingDecider)
val awaitingConfirmation = new AtomicInteger(0)
@volatile var inIsClosed = false
var completionState: Option[Try[Unit]] = None
override protected def logSource: Class[_] = classOf[PathThroughStage[R, V, PassThrough]]
def checkForCompletion() = {
if (isClosed(in) && awaitingConfirmation.get == 0) {
completionState match {
case Some(Success(_)) => completeStage()
case Some(Failure(ex)) => failStage(ex)
case None => failStage(new IllegalStateException("Stage completed, but there is no info about status"))
}
}
}
val checkForCompletionCB = getAsyncCallback[Unit] { _ =>
checkForCompletion()
}
val failStageCb = getAsyncCallback[Throwable] { ex =>
failStage(ex)
}
setHandler(out, new OutHandler {
override def onPull() = {
tryPull(in)
}
})
setHandler(in, new InHandler {
override def onPush() = {
val msg = grab(in)
val f = Future[Result[R, PassThrough]] {
try {
Result(// TODO YOUR logic
msg.record,
msg.passThrough)
} catch {
case exception: Exception =>
decider(exception) match {
case Supervision.Stop =>
failStageCb.invoke(exception)
case _ =>
Result(exception, msg.passThrough)
}
}
if (awaitingConfirmation.decrementAndGet() == 0 && inIsClosed) checkForCompletionCB.invoke(())
}
awaitingConfirmation.incrementAndGet()
push(out, f)
}
override def onUpstreamFinish() = {
inIsClosed = true
completionState = Some(Success(()))
checkForCompletion()
}
override def onUpstreamFailure(ex: Throwable) = {
inIsClosed = true
completionState = Some(Failure(ex))
checkForCompletion()
}
})
override def postStop() = {
log.debug("Stage completed")
super.postStop()
}
}
logic
}
}