【问题标题】:Testing Akka Persistence with Akka HTTP - journal is not cleared使用 Akka HTTP 测试 Akka 持久性 - 日志未清除
【发布时间】:2021-09-30 10:26:54
【问题描述】:

我尝试将 Akka Persistence Test Kit 与 Akka HTTP Test Kit 一起使用,但在每次测试之前我的内存日志都没有被清除。

非常简单的持久化行为 - 只需放入字符串并获取所有存储的字符串:

object MyStore {
  def apply(): Behavior[Command] = EventSourcedBehavior[Command, Event, State](
    persistenceId = PersistenceId.ofUniqueId("myId"),
    emptyState = State(),
    commandHandler = (state, command) => handleCommand(state, command),
    eventHandler = (state, event) => handleEvent(state, event)
  )

  sealed trait Command
  case class AddCmd(s: String, replyTo: ActorRef[List[String]]) extends Command
  case class ReadCmd(replyTo: ActorRef[List[String]])           extends Command

  sealed trait Event
  case class AddEvent(s: String) extends Event

  case class State(values: List[String] = List())

  def handleCommand(state: State, command: Command): ReplyEffect[Event, State] = command match {
    case AddCmd(s, replyTo) => Effect.persist(AddEvent(s)).thenReply(replyTo)(updatedState => updatedState.values)
    case ReadCmd(replyTo)   => Effect.reply(replyTo)(state.values)
  }

  def handleEvent(state: State, event: Event): State = event match {
    case AddEvent(s) => state.copy(values = s :: state.values)
  }
}

具有持久性和序列化配置的 Actor 系统配置:

object MySpec {
  val configuration: Config = {
    val serializationConfigString = "akka.actor.allow-java-serialization = on"
    val serialization             = ConfigFactory.parseString(serializationConfigString).resolve()
    val persistence               = PersistenceTestKitPlugin.config
    serialization.withFallback(persistence)
  }
}

我的测试课:

class MySpec extends AnyFunSuite with Matchers with ScalatestRouteTest with BeforeAndAfterEach {
  import MyStore._
  import akka.http.scaladsl.server.Directives._

  val persistenceTestKit: PersistenceTestKit = PersistenceTestKit(system)

  val route: Route = {
    import akka.actor.typed.scaladsl.AskPattern._
    import akka.actor.typed.scaladsl.adapter._
    implicit val typedSystem: ActorSystem[Nothing] = system.toTyped
    implicit val timeout: Timeout                  = 3.seconds

    val actor: ActorRef[Command] =
      system.spawn(behavior = MyStore(), name = "MyStore", props = Props.empty)

    get {
      val result = actor.ask(replyTo => ReadCmd(replyTo)).map(_.mkString(";"))
      complete(result)
    } ~ (post & entity(as[String])) { newRecord =>
      val result = actor.ask(replyTo => AddCmd(newRecord, replyTo)).map(_ => "OK")
      complete(result)
    }
  }

  override def createActorSystem(): akka.actor.ActorSystem =
    akka.actor.ActorSystem("MySystem", MySpec.configuration)

  override def beforeEach(): Unit = {
    persistenceTestKit.clearAll()
  }

  private def add(s: String) = {
    Post("/", s) ~> route ~> check {
      responseAs[String] shouldEqual "OK"
    }
  }

  test("Add two elements") {
    add("One")
    add("Two")

    Get() ~> route ~> check {
      responseAs[String] shouldEqual "Two;One"
    }
  }

  test("Add another two element") {
    add("Three")
    add("Four")

    Get() ~> route ~> check {
      responseAs[String] shouldEqual "Four;Three"
    }
  }
}

如果我单独运行每个测试,它会起作用。但是,如果我一个接一个地运行两个测试,我就会进入第二个测试:

Expected :"Four;Three[]"
Actual   :"Four;Three[;Two;One]"

我的 build.sbt 文件:

name := "persistence-http-test"

version := "0.1"

scalaVersion := "2.13.6"

val AkkaVersion = "2.6.14"
val AkkaHttpVersion = "10.2.4"
val ScalatestVersion = "3.2.5"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
  "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
  "org.scalatest" %% "scalatest" % ScalatestVersion,
  "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion,
  "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion
)

存储库:https://github.com/LukBed/akka-persistence-http-test-issue

【问题讨论】:

    标签: scala akka akka-http akka-persistence akka-testkit


    【解决方案1】:

    通过在每次测试之前执行persistenceTestKit.clearAll(),持久性存储中的所有数据都将被删除,但 MyStore actor 的内存中状态内容保持不变——因此在后续测试中失败。

    另一个后果是持久性存储将与参与者的状态不同步。为了数据的一致性,最好提供一些Clear-command/event 处理例程,类似于Add/Read的处理方式:

    object MyStore {
      // ...
    
      sealed trait Command
      case class AddCmd(s: String, replyTo: ActorRef[List[String]]) extends Command
      case class ReadCmd(replyTo: ActorRef[List[String]])           extends Command
      case class ClearCmd(replyTo: ActorRef[List[String]])          extends Command
    
      sealed trait Event
      case class AddEvent(s: String) extends Event
      case object ClearEvent         extends Event
    
      case class State(values: List[String] = Nil)
    
      def handleCommand(state: State, command: Command): ReplyEffect[Event, State] = command match {
        case AddCmd(s, replyTo) => Effect.persist(AddEvent(s)).thenReply(replyTo)(_.values)
        case ReadCmd(replyTo)   => Effect.reply(replyTo)(state.values)
        case ClearCmd(replyTo)  => Effect.persist(ClearEvent).thenReply(replyTo)(_.values)
      }
    
      def handleEvent(state: State, event: Event): State = event match {
        case AddEvent(s) => state.copy(values = s :: state.values)
        case ClearEvent  => state.copy(values = Nil)
      }
    }
    

    您现在可以通过put 使用route ~> check,让Clear 命令/事件处理程序在每次测试之前清除参与者内部状态和持久性日志:

    class MySpec extends AnyFunSuite with Matchers with ScalatestRouteTest with BeforeAndAfterEach {
      // ...
    
      val route: Route = {
        ...
    
        get {
          val result = actor.ask(replyTo => ReadCmd(replyTo)).map(_.mkString(";"))
          complete(result)
        } ~
        put {
          val result = actor.ask(replyTo => ClearCmd(replyTo)).map(_.mkString(";"))
          complete(result)
        } ~
        post { entity(as[String]) { newRecord =>
          val result = actor.ask(replyTo => AddCmd(newRecord, replyTo)).map(_ => "OK")
          complete(result)
        } }
      }
    
      // ...
    
      override def beforeEach(): Unit = {
        initStateAndJournal()
      }
    
      private def initStateAndJournal() = {
        Put("/", "clear") ~> route ~> check {
          responseAs[String] shouldEqual ""
        }
      }
    
      // ...
    }
    

    【讨论】:

      猜你喜欢
      • 2019-11-06
      • 1970-01-01
      • 1970-01-01
      • 2019-02-10
      • 1970-01-01
      • 2016-10-22
      • 1970-01-01
      • 1970-01-01
      • 2015-02-15
      相关资源
      最近更新 更多