【问题标题】:Read journal exactly once只读一次日记
【发布时间】:2016-07-18 12:18:42
【问题描述】:

我正在使用 akka-persistence 的 PersistenceQuery 将初始状态加载到我的管理东西的演员。我希望它在启动时只重播一次,但它会不断将它们发送到日志中。

14:11:28.405 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:28.407 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:31.376 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:31.377 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:34.376 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:34.378 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]

这是我为实现它而编写的程序。

implicit val mat = ActorMaterializer()(context)
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
      LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("rooms", 0L, Long.MaxValue)
val events: Source[Any, NotUsed] = src.map(_.event)
val future = events.runWith(Sink.foreach{
  case x: RoomCreated => process(x)
  case x: RoomDeleted => process(x)
  case x => logger.error(s"Could not spawn $x")
})

【问题讨论】:

  • 生成该日志语句的代码在哪里?我认为您需要显示更多代码,以便我们理解为什么查询似乎被执行了多次。
  • 预启动曾经的演员
  • 你是在启动这个actor的多个实例,还是由主管重新启动它以响应处理过程中某处发生的异常?
  • 我会注意到他是否崩溃了,因为当他开始时,他做了一堆工作,用“Spawned Child”向控制台发送垃圾邮件,所以他没有重新启动。当我明天回来工作时,我会发布更多代码。
  • @cmbaxter 如果 foreach 有问题怎么办?如果它想检查是否发生了任何新事件怎么办?

标签: scala akka event-sourcing akka-persistence


【解决方案1】:

我认为您的预期行为与您实际看到的不同之处在于 eventsByPersistenceId 是一个“实时”流。这意味着它不仅会返回从您提供的偏移范围内开始的事件(您从 0 开始并转到 Long.MaxValue,所以一切),但如果新事件进入,它将继续向您发送。如果您不这样做不想要直播,然后将呼叫改为currentEventsByPersistenceId。这将仅包括截至该时间点(您提出请求的时间)的内容,而不是实时流。这应该是您正在寻找的。​​p>

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-12-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多