【问题标题】:Event sourcing incremental int id事件溯源增量 int id
【发布时间】:2017-06-01 14:15:54
【问题描述】:

我看了很多事件溯源教程,都在使用简单的演示来专注于教程主题(事件溯源)

这很好,直到您在实际工作应用程序中遇到这些教程之一未涵盖的内容 :)

我碰到了这样的东西。 我有两个数据库,一个事件存储和一个投影存储(读取模型) 所有聚合都有一个 GUID Id,到目前为止它是 100% 没问题的。

现在我创建了一个新的 JobAggregate 和一个 Job Projection。 而且我公司要求有一个唯一的增量 int64 作业 ID。

现在我看起来很愚蠢:) 另一个问题是每秒创建多次作业! 这意味着,获取下一个数字的方法必须非常安全。

过去(没有 ES)我有一个表,将 PK 定义为自动增量 int64,保存 Job,DB 完成工作给我下一个数字,完成。

但是如何在我的聚合或命令处理程序中执行此操作? 通常,投影作业是由事件处理程序创建的,但这在过程中已经很晚了,因为聚合应该已经有 int64 了。 (用于在空数据库上重放聚合并具有相同的聚合 ID -> 作业 ID 关系)

我应该如何解决这个问题?

亲切的问候

【问题讨论】:

  • 这里有一个建议:使用 GUID 作为真实 ID,并将整数 ID 视为聚合上的另一个数据。然后创建一个追踪器,它将获取聚合创建事件并为每个事件生成新的“分配 ID”事件。追逐者可以拥有序列生成器,对于不关心数字 ID 的任何人来说都没有瓶颈。
  • 我如何获得下一个? (增加 1)因为数字只存在于事件中。我将不得不回复所有聚合然后得到最大的一个,但这显然是不可能的解决方案。就像我说的那样,每秒可以创建多个。
  • 您的 ID 确实需要单一来源。聚合本身可以做到这一点,但随后无法将其分发到其他服务器。当然,数据库可能是那个来源,但我认为这有点违背了使用 CQRS 的目的。您可以编写自己的服务,但这非常复杂,因为您正在尝试编写不会减慢聚合速度的东西。我发现解决增量 ID 的需求要好得多——这种需求与分布式系统完全不同。
  • @SharpNoiZy 你有一个可以提供课程序列的服务。追逐者拥有对该序列的独占访问权,从中获取 ID,并将它们作为事件发布。
  • 业务需求应该是什么?大多数时候,它来自对数据库等有一点了解但没有架构远见的人,并且有些“危险”的人通过混合他们的业务需求和他们管理项目的意愿将 IT 引导到错误的目录中。认真花时间与您的同事/经理/客户/其他人讨论这个问题。

标签: c# cqrs event-sourcing


【解决方案1】:

过去(没有 ES)我有一个表,将 PK 定义为自动增量 int64,保存 Job,DB 完成工作给我下一个数字,完成。

在此序列中需要注意一件重要的事情,即唯一标识符的生成和数据在记录簿中的持久性都共享一个事务。

当您将这些想法分开时,您基本上是在查看两个事务 - 一个使用 id,因此没有其他聚合尝试共享它,另一个将该 id 写入存储。

最好的答案是安排这两个部分是同一事务的一部分——例如,如果您使用关系数据库作为事件存储,那么您可以在“aggregate_id to long”表中创建一个条目保存与事件相同的事务。

另一种可能性是将聚合的“创建”视为Prepare,后跟Created;使用事件处理程序通过事后保留长标识符来响应准备事件,然后将新命令发送到聚合以将长标识符分配给它。所以Created 的所有消费者都会看到分配给它的 long 聚合。

值得注意的是,您正在为您正在创建的每个聚合分配有效的随机长,因此您最好深入了解公司认为它从中获得了什么好处——如果他们期望标识符将提供排序保证或完整性保证,那么您最好理解这一点。

先保留多头并没有什么特别的错误;根据聚合保存失败的频率,您最终可能会出现差距。在大多数情况下,您应该期望能够保持较小的失败率(即 - 您在实际运行之前检查以确保您期望命令成功)。

在真正意义上,唯一标识符的生成属于set validation的范畴;我们通常通过放弃任何排序假装并假装碰撞风险为零来“欺骗”UUID。关系数据库非常适合集合验证;活动商店可能没有那么多。如果您需要模型控制的唯一顺序标识符,那么您的“分配的标识符集”需要在聚合中。

要遵循的关键短语是“业务成本”——确保您了解长标识符为何有价值。

【讨论】:

    【解决方案2】:

    这就是我的处理方式。

    我同意 ID 生成器的想法,它是“业务 ID”而不是“技术 ID”

    这里的核心是拥有一个应用程序级别的JobService,它处理所有基础架构服务以编排要完成的工作。

    控制器(如网络控制器或命令行)将直接使用应用程序级别的JobService 来控制/命令状态更改。

    它是类似 PHP 的伪代码,但这里我们讨论的是架构和流程,而不是语法。把它改成 C# 语法,事情是一样的。

    应用层

    class MyNiceWebController
    {
        public function createNewJob( string $jobDescription, xxxx $otherData, ApplicationJobService $jobService )
        {
            $projectedJob = $jobService->createNewJobAndProject( $jobDescription, $otherData );
    
            $this->doWhateverYouWantWithYourAleadyExistingJobLikeForExample301RedirectToDisplayIt( $projectedJob );
        }
    }
    
    class MyNiceCommandLineCommand
    {
        private $jobService;
    
        public function __construct( ApplicationJobService $jobService )
        {
            $this->jobService = $jobService;
        }
    
        public function createNewJob()
        {
            $jobDescription = // Get it from the command line parameters
            $otherData = // Get it from the command line parameters
    
            $projectedJob = $this->jobService->createNewJobAndProject( $jobDescription, $otherData );
    
            // print, echo, console->output... confirmation with Id or print the full object.... whatever with ( $projectedJob );
        }
    }
    
    class ApplicationJobService
    {
        // In application level because it just serves the first-level request
        // to controllers, commands, etc but does not add "domain" logic.
    
        private $application;
        private $jobIdGenerator;
        private $jobEventFactory;
        private $jobEventStore;
        private $jobProjector;
    
        public function __construct( Application $application, JobBusinessIdGeneratorService $jobIdGenerator, JobEventFactory $jobEventFactory, JobEventStoreService $jobEventStore, JobProjectorService $jobProjector )
        {
            $this->application = $application;  // I like to lok "what application execution run" is responsible of all domain effects, I can trace then IPs, cookies, etc crossing data from another data lake.
            $this->jobIdGenerator = $jobIdGenerator;
            $this->jobEventFactory = $jobEventFactory;
            $this->jobEventStore = $jobEventStore;
            $this->jobProjector = $jobProjector;
        }
    
        public function createNewJobAndProjectIt( string $jobDescription, xxxx $otherData ) : Job
        {
            $applicationExecutionId = $this->application->getExecutionId();
    
            $businessId = $this->jobIdGenerator->getNextJobId();
    
            $jobCreatedEvent = $this->jobEventFactory->createNewJobCreatedEvent( $applicationExecutionId, $businessId, $jobDescription, $otherData );
    
            $this->jobEventStore->storeEvent( $jobCreatedEvent );       // Throw exception if it fails so no projecto will be invoked if the event was not created.
    
            $entityId = $jobCreatedEvent->getId();
            $projectedJob = $this->jobProjector->project( $entityId );
    
            return $projectedJob;
        }
    }
    

    注意:如果同步投影的投影成本太高,只需返回 ID:

            // ...
            $entityId = $jobCreatedEvent->getId();
            $this->jobProjector->enqueueProjection( $entityId );
    
            return $entityId;
        }
    }
    

    基础设施级别(各种应用程序通用)

    class JobBusinessIdGenerator implements DomainLevelJobBusinessIdGeneratorInterface
    {
        // In infrastructure because it accesses persistance layers.
    
        // In the creator, get persistence objects and so... database, files, whatever.
    
        public function getNextJobId() : int
        {
            $this->lockGlobalCounterMaybeAtDatabaseLevel();
    
            $current = $this->persistance->getCurrentJobCounter();
            $next = $current + 1;
            $this->persistence->setCurrentJobCounter( $next );
    
            $this->unlockGlobalCounterMaybeAtDatabaseLevel();
    
            return $next;
        }
    }
    

    域级别

    class JobEventFactory
    {
        // It's in this factory that we create the entity Id.
    
        private $idGenerator;
    
        public function __construct( EntityIdGenerator $idGenerator )
        {
            $this->idGenerator = $idGenerator;
        }
        
        public function createNewJobCreatedEvent( Id $applicationExecutionId, int $businessId, string $jobDescription, xxxx $otherData ); : JobCreatedEvent
        {
            $eventId = $this->idGenerator->createNewId();
            $entityId = $this->idGenerator->createNewId();
            
            // The only place where we allow "new" is in the factories. No other places should do a "new" ever.
            $event = new JobCreatedEvent( $eventId, $entityId, $applicationExecutionId, $businessId, $jobDescription, $otherData );
    
            return $event; 
        }
    }
    

    如果您不喜欢创建 entityId 的工厂,在某些人看来可能很难看,只需将其作为具有特定类型的参数传递,并承担创建新实体的责任,不要在其他中间体重复使用服务(绝不是应用程序服务)为您创建它。

    尽管如此,如果您这样做,请注意如果“愚蠢的”服务仅创建具有相同实体 ID 的“两个”JobCreatedEvent 怎么办?那真的很难看。最后,创建只会发生一次,并且 Id 是在“创建 JobCreationEvent 事件”(冗余冗余)的核心创建的。反正你的选择。

    其他类...

    class JobCreatedEvent;
    class JobEventStoreService;
    class JobProjectorService;
    

    这篇文章中无关紧要的事情

    如果投影仪应该位于基础架构级别,对多个调用它们的应用程序是全局的……甚至在域中(因为我需要“至少”一种读取模型的方法)或者它更多属于应用程序(也许同一个模型可以在 4 个不同的应用程序中以 4 种不同的方式读取,并且每个应用程序都有自己的投影仪)...

    如果隐含在事件存储或应用程序级别,我们可以讨论很多副作用在哪里触发(我没有调用任何副作用处理器 == 事件侦听器)。我认为副作用存在于应用程序层,因为它们依赖于基础设施......

    但这一切...不是这个问题的主题。

    我不关心这个“帖子”的所有这些事情。当然,它们并非可以忽略不计的主题,您将有自己的策略。而您必须非常仔细地设计这一切。但这里的问题是在哪里创建来自业务需求的自动增量 ID。并且在这里以“干净代码”的方式执行所有这些投影仪(有时称为 calculators)和副作用(有时称为 reactors)会模糊这个答案的重点。你明白了。

    我在这篇文章中关心的事情

    我关心的是:

    • 如果专家认为是“自动数字”,那么它就是“域要求”,因此它是与“描述”或“其他数据”具有相同定义级别的属性。
    • 他们想要这个属性的事实与所有实体都具有编码器选择的格式的“内部 id”(uuid、sha1 或其他)这一事实相冲突。李>
    • 如果您需要该属性的顺序 ID,则需要一个“值提供者”AKA JobBusinessIdGeneratorService,它与“实体 ID”本身无关。
    • ID 生成器将负责确保一旦数字自动递增,它在返回给客户端之前同步持久化,因此不可能返回两次相同的 ID失败时。

    缺点

    您必须处理一个序列泄漏:

    如果 Id 生成器指向 4007,下一次调用 getNextJobId() 会将其递增到 4008,将指针保持为“current = 4008”,然后返回。

    如果由于某种原因创建和持久化失败,那么下一次调用将给出 4009。然后我们将有一个序列 [ 4006, 4007, 4009, 4010 ],而缺少 4008

    这是因为从生成器的角度来看,4008 是“实际使用的”,并且作为生成器,它不知道你用它做了什么,就像你有一个提取 100 个数字的愚蠢循环一样.

    从不catchtry / catch 块中使用 ->rollback() 进行补偿,因为如果您获得 2008,另一个进程获得 2009,那么这可能会产生并发问题,然后是第一个进程失败,回滚将中断。只需假设“失败”时 Id 被“消耗”了,不要责怪生成器。怪谁失败了。

    希望对你有帮助!

    【讨论】:

      【解决方案3】:

      @SharpNoizy,很简单。

      创建您自己的 ID 生成器。说一个字母数字字符串,例如“DB3U8DD12X”,它为您提供了数十亿的可能性。现在,您要做的是通过给每个字符一个有序值来按顺序生成这些 id...

      0 - 0
      1 - 1
      2 - 2
      .....
      10 - A
      11 - B
      

      明白了吗?因此,您接下来要做的是创建您的函数,该函数将使用该矩阵增加“D74ERT3E4”字符串的每个索引。

      那么,“R43E4D”、“R43E4E”、“R43E4F”、“R43E4G”……明白了吗?

      然后,当您加载应用程序时,您会查看数据库并找到生成的最新 Id。然后,您在内存中加载接下来的 50,000 个组合(以防您想要超高速)并创建一个静态类/方法,该类/方法将为您提供该值。

      Aggregate.Id = IdentityGenerator.Next();
      

      通过这种方式,您可以控制 ID 的生成,因为这是唯一具有该功能的类。

      我喜欢这种方法,因为例如在您的 web api 中使用它时更“可读”。 GUID 很难(也很乏味)阅读、记住等。

      GET api/job/DF73api/job/XXXX-XXXX-XXXXX-XXXX-XXXX

      更容易记住

      这有意义吗?

      【讨论】:

      • 这不会涉及阅读评论中提到的@SharpNoizy 等所有聚合事件吗?
      • 不一定。您可以使用独立的机制来跟踪您颁发的 ID。此外,您可以使用为该对象分配的 Id 存储所有事件。归根结底,事件溯源只是聚合实体上的单个更改的集合。再添加一个带有 ID 的列或属性。有很多方法可以实现这一目标。它归结为您的特定需求和“创造力”:)
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-02-20
      • 2016-04-28
      • 2013-03-09
      • 1970-01-01
      • 2017-04-06
      • 2020-05-27
      • 2021-06-24
      相关资源
      最近更新 更多