【问题标题】:NEventStore optimistic lockNEventStore 乐观锁
【发布时间】:2013-08-25 15:52:46
【问题描述】:

我是 NEventStore 和一般事件溯源的新手。在一个项目中,我想使用 NEventStore 来持久化我们的聚合生成的事件,但是我在正确处理并发方面遇到了一些问题。

如何使用乐观锁写入同一个流?

假设我有 2 个相同聚合的实例,它们从 2 个不同的线程在版本 1 中加载。然后是第一个线程调用命令 A 和第二个线程调用命令 B 。使用一个乐观锁应该会因为并发异常而失败。

我想使用 maxRevision 从加载聚合的点打开流,但似乎 CommitChanges 永远不会失败,如果我通过旧修订版也是如此。

我错过了什么?使用 NEventStore/Event Sourcing 时乐观锁可能/正确吗?

这是我用来重现问题的代码:

namespace NEventStore.Example
{
    using System;
    using System.Transactions;
    using NEventStore;
    using NEventStore.Dispatcher;
    using NEventStore.Persistence.SqlPersistence.SqlDialects;

    internal static class MainProgram
    {
        private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
        private static IStoreEvents store;

        private static void Main()
        {
            using (var scope = new TransactionScope())
            using (store = WireupEventStore())
            {
                Client1(revision: 0);

                Client2(revision: 0);

                scope.Complete();
            }

            Console.WriteLine(Resources.PressAnyKey);
            Console.ReadKey();
        }

        private static IStoreEvents WireupEventStore()
        {
             return Wireup.Init()
                .UsingInMemoryPersistence()
                .Build();
        }

        private static void Client1(int revision)
        {
            using (var stream = store.OpenStream(StreamId, 0, revision))
            {
                var @event = new SomeDomainEvent { Value = "Client 1 - event 1." };

                stream.Add(new EventMessage { Body = @event });


                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void Client2(int revision)
        {
            using (var stream = store.OpenStream(StreamId, 0, revision))
            {
                var @event = new SomeDomainEvent { Value = "Client 2 - event 1." };

                stream.Add(new EventMessage { Body = @event });


                stream.CommitChanges(Guid.NewGuid());
            }
        }
    }
}

我预计客户端 2 会失败,因为我使用旧版本打开流。

2013 年 8 月 26 日更新: 我已经使用 Sql server 测试了相同的代码,并且似乎可以按预期工作。

namespace NEventStore.Example
{
    using System;
    using System.Transactions;
    using NEventStore;
    using NEventStore.Dispatcher;
    using NEventStore.Persistence.SqlPersistence.SqlDialects;

    internal static class MainProgram
    {
        private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
        private static IStoreEvents store;

        private static void Main()
        {
            using (store = WireupEventStore())
            {
                OpenOrCreateStream();

                AppendToStream_Client1(revision: 1);

                AppendToStream_Client2(revision: 1); // throws an error
                // AppendToStream_Client2(revision: 2); // works
            }

            Console.WriteLine(Resources.PressAnyKey);
            Console.ReadKey();
        }

        private static IStoreEvents WireupEventStore()
        {
             return Wireup.Init()
                .LogToOutputWindow()
                .UsingInMemoryPersistence()
                .UsingSqlPersistence("EventStore") // Connection string is in app.config
                    .WithDialect(new MsSqlDialect())
                    .InitializeStorageEngine()
                    .UsingJsonSerialization()
                .Build();
        }

        private static void OpenOrCreateStream()
        {
            using (var stream = store.OpenStream(StreamId, 0, int.MaxValue))
            {
                var @event = new SomeDomainEvent { Value = "Initial event." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void AppendToStream_Client1(int revision)
        {
            using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
            {
                var @event = new SomeDomainEvent { Value = "Second event 1." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void AppendToStream_Client2(int revision)
        {
            using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
            {
                var @event = new SomeDomainEvent { Value = "Second event 2." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }
    }
}

回到我的问题:要启用乐观锁,我应该在打开流时使用修订版吗?还有其他可能的实现或指南吗?

谢谢

【问题讨论】:

    标签: .net concurrency event-sourcing neventstore


    【解决方案1】:

    首先,主要目的是测试的内存中持久性实现不支持事务。在您的原始示例中,客户端 2 将简单地将其事件附加到流中。尝试使用支持事务(SQL 和 Raven,但不支持 Mongo)的持久性存储运行上述内容。

    其次,在打开流时指定最小/最大修订用于不同目的:

    1. 当重新混合聚合时,没有可用的快照,您可以指定 (min:0, max:int.MaxValue),因为您有兴趣检索所有事件。
    2. 当重新整合聚合并且快照可用时,您可以指定 (min:snapshot.Version, max:int.MaxValue) 以获取自快照以来发生的所有事件。
    3. 保存聚合时,应指定 (min:0, max:Aggregate.Version)。 Aggregate.Version 是在再水合期间派生的。如果同一聚合体同时在其他地方重新水化并保存,您将遇到竞争条件并且会出现ConcurrencyException

    对大部分内容的支持将封装在域框架中。请参阅 CommonDomain 中的 AggregateBaseEventStoreRepository

    第三,也是最重要的,在单个事务中更新 >1 个流是一种代码异味。如果您正在执行 DDD/ES,则流表示单个聚合根,根据定义,它是一致性边界。在事务中创建/更新多个 AR 会打破这一点。 (不情愿地)添加了 NEventStore 的事务支持,以便它可以与其他工具一起使用,即以事务方式从 MSMQ/NServiceBus/whatever 读取命令并处理它,或者以事务方式将提交消息分派到队列并将其标记为这样。就个人而言,我会建议您尽量避免使用 2PC。

    【讨论】:

    • 谢谢达米安。但我不确定是否理解。假设我删除了 TransactionScope。可以处理乐观锁吗?如何?基本上我只想在没有其他事件同时提交的情况下才写入流。
    • 我已经使用 SQL Server 更新了问题并且没有事务。如果我通过了错误的修订,现在第二个附加失败。这是处理这种情况的正确方法吗?在这种情况下,我应该将修订保存为聚合状态,并在保存新事件时将其传回。这是预期的实现吗?
    • 乐观锁由每个持久化引擎处理。在 SQL 中它基于 StreamId 和 CommitSequence 的主键。因此,如果您同时打开同一个流两次,向两者添加一个提交,这将导致 CommitSequence 冲突和 ConcurrencyException。
    • 我在 InMemoryPersistenceEngine 中对 CommitMessages github.com/NEventStore/NEventStore/commit/… 的并发检查发现了一个错误(这与事务范围无关)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-09-12
    • 1970-01-01
    • 2011-02-23
    • 2013-06-30
    • 1970-01-01
    • 2018-03-29
    相关资源
    最近更新 更多