【发布时间】:2016-07-18 12:18:42
【问题描述】:
我正在使用 akka-persistence 的 PersistenceQuery 将初始状态加载到我的管理东西的演员。我希望它在启动时只重播一次,但它会不断将它们发送到日志中。
14:11:28.405 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:28.407 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:31.376 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:31.377 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:34.376 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:34.378 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
这是我为实现它而编写的程序。
implicit val mat = ActorMaterializer()(context)
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("rooms", 0L, Long.MaxValue)
val events: Source[Any, NotUsed] = src.map(_.event)
val future = events.runWith(Sink.foreach{
case x: RoomCreated => process(x)
case x: RoomDeleted => process(x)
case x => logger.error(s"Could not spawn $x")
})
【问题讨论】:
-
生成该日志语句的代码在哪里?我认为您需要显示更多代码,以便我们理解为什么查询似乎被执行了多次。
-
预启动曾经的演员
-
你是在启动这个actor的多个实例,还是由主管重新启动它以响应处理过程中某处发生的异常?
-
我会注意到他是否崩溃了,因为当他开始时,他做了一堆工作,用“Spawned Child”向控制台发送垃圾邮件,所以他没有重新启动。当我明天回来工作时,我会发布更多代码。
-
@cmbaxter 如果 foreach 有问题怎么办?如果它想检查是否发生了任何新事件怎么办?
标签: scala akka event-sourcing akka-persistence