在做了一些研究并承认在处理命令(上面的第一种方法)期间更新读取端可能会失败的事实,这将涉及处理回滚和事务。更好的方法是使用持久性查询。这在很大程度上依赖于您的事件日志支持 Persistence Query 的不同功能(如 AllPersistenceIdsQuery 和 EventsByTag),这些功能目前由 Cassandra、Event Store 和其他一些日志支持。
这个想法是将命令端与查询端分离,其中命令端根本不需要知道有任何查询端。这种解耦是使用Persistence Query 提供给我们的。这个想法是命令端应该只关心验证传入的命令并将事件持久化到事件日志中。就是这样,没有查询端的意识。
现在对于查询端,您可以使用诸如EventsByTag 或AllPersistenceIdsQuery 之类的持久性查询从日志中获取Source[Event],这是@ 的背压 实时流987654331@s 来自事件日志,您使用此抽象来提供您的阅读面。您可以找到方法here
让我们想想失败
如果指挥方宕机怎么办?
没有新的Events 将被持久化到事件日志中,读取端持久性查询将愉快地在流中产生新的Events。当命令端恢复时,一切都会恢复,现在读取端持久性查询将在流中看到新的Events。
如果其中一个读取端出现故障怎么办?
不是什么大问题,您可以擦除读取端数据库并从头开始重新启动持久性查询,然后我们就走了。我们绝对可以通过使用Resumable Projections 的概念来改进这一点。这个想法是继续存储您读取的每个Event 的当前偏移量,这样如果您的读取端出现故障,我们可以简单地从我们正在读取的点恢复,而不必从头开始。一个忠告,如果您需要一次有效的交付,那么您可能需要考虑使用幂等过滤器来避免重复。如果你这样做,你可以做一些优化,你不需要保留每个 ID,而只是在特定的时间间隔。
其他一些注意事项
如果您需要引入新的读取端并且它们从一开始就依赖于命令端事件怎么办?
这种解耦方法允许您执行此操作。这个想法是您不要从事件存储中删除事件,只需启动另一个持久性查询并让它为您的新读取端提供数据,如果您需要保持最新状态,请不要忘记使用可恢复投影。
这些方法隐含了持久性查询以及在同一系统上提供读取端和可恢复投影的逻辑。
另一种方法是将 Persistence Query 与 Akka HTTP 结合起来,并公开一个流式端点,该端点公开您的事件日志,并允许您从一开始或从某个偏移量获取所有事件/某些事件。这种方式允许你做一些进一步的解耦,但如果你使用这种方法,你真的希望有 Resumable Projections,因为现在由于引入了 HTTP 而增加了失败。现在,您的 Read Sides 可以使用此流式传输端点并在其一侧拥有 Resumable Projection,并且可以以更加解耦的方式引入新的 Read Sides。
这些只是我开始使用 Akka 以来收集到的一些知识。也许一些更有经验的人对 CQRS 有更好的方法。
如果您正在寻找代码示例,我强烈建议您查看 Christian Baxter 的 Mastering Akka,它更详细地描述了该方法以及 post。
感谢阅读