【问题标题】:How to make command to wait until all events triggered against it are completed successfully如何使命令等待直到针对它触发的所有事件都成功完成
【发布时间】:2020-09-24 11:27:05
【问题描述】:

我遇到了一个要求,我希望轴突等到事件总线中针对特定命令触发的所有事件完成执行。我将简要介绍一下场景:

我有一个 RestController,它会触发以下命令来创建应用程序实体:

@RestController
class myController{
  @PostMapping("/create")
  @ResponseBody
  public String create(
    org.axonframework.commandhandling.gateway.CommandGateway.sendAndWait(new CreateApplicationCommand());
    System.out.println(“in myController:: after sending CreateApplicationCommand”);
  }
}

该命令正在Aggregate 中处理,Aggregate 类用org.axonframework.spring.stereotype.Aggregate 注释:

@Aggregate
class MyAggregate{
   @CommandHandler //org.axonframework.commandhandling.CommandHandler
   private MyAggregate(CreateApplicationCommand command) {
      org.axonframework.modelling.command.AggregateLifecycle.apply(new AppCreatedEvent());
      System.out.println(“in MyAggregate:: after firing AppCreatedEvent”);
   }

   @EventSourcingHandler //org.axonframework.eventsourcing.EventSourcingHandler
   private void on(AppCreatedEvent appCreatedEvent) {
      // Updates the state of the aggregate
      this.id = appCreatedEvent.getId();
      this.name = appCreatedEvent.getName();
      System.out.println(“in MyAggregate:: after updating state”);
   }
}

AppCreatedEvent 在 2 个地方处理:

  1. 在聚合本身中,如上所示。
  2. 在投影类中如下:
 @EventHandler //org.axonframework.eventhandling.EventHandler
 void on(AppCreatedEvent appCreatedEvent){
    // persists into database
    System.out.println(“in Projection:: after saving into database”);
 }

这里的问题是在首先捕获事件之后(即在聚合内部),调用被返回到 myController。 即这里的输出是:

in MyAggregate:: after firing AppCreatedEvent
in MyAggregate:: after updating state
in myController:: after sending CreateApplicationCommand
in Projection:: after saving into database

我想要的输出是:

in MyAggregate:: after firing AppCreatedEvent
in MyAggregate:: after updating state
in Projection:: after saving into database
in myController:: after sending CreateApplicationCommand

简单来说,我希望轴突等到针对特定命令触发的所有事件都完全执行,然后返回触发该命令的类。

在论坛上搜索后,我知道所有 sendAndWait 所做的就是等到命令的处理和事件的发布完成,然后我厌倦了 Reactor Extension 以及使用下面但得到相同的结果:@987654327 @

谁能帮帮我。 提前致谢。

【问题讨论】:

    标签: axon


    【解决方案1】:

    @rohit,在您的情况下,最好的办法是接受您在这里使用最终一致的解决方案这一事实。因此,命令处理与事件处理完全分开,使您创建的查询模型最终与命令模型(您的聚合)保持一致。因此,您不必完全等待事件,而是在查询模型出现时做出反应。

    接受这一点归结为构建您的应用程序,以便“是的,我知道我的回复现在可能不是最新的,但它可能在不久的将来某个地方。”因此,建议在您发送命令之后或之前订阅您感兴趣的结果。 例如,您可以将此视为使用带有 STOMP 协议的 WebSocket,或者您可以利用 Project Reactor 并使用 Flux 结果类型来接收结果。

    根据您的描述,我假设您或您的企业已决定 UI 组件应该以(老式)同步方式做出反应。这没有什么问题,但是当涉及到使用像 CQRS 这样本质上最终一致的东西时,它会咬你的 *ss。但是,如果您愿意,您可以在前端欺骗您是同步的这一事实。

    为了实现这一点,我建议使用 Axon 的订阅查询来订阅您知道将由您发送的命令更新的查询模型。 在伪代码中,它看起来有点像这样:

    public Result mySynchronousCall(String identifier) {
        // Subscribe to the updates to come
        SubscriptionQueryResult<Result> result = QueryGateway.subscriptionQuery(...);
        // Issue command to update
        CommandGateway.send(...);
        // Wait on the Flux for the first result, and then close it
        return result.updates()
                     .next()
                     .map(...)
                     .timeout(...)
                     .doFinally(it -> result.close());
    
    }
    

    顺便说一下,您可以在这个示例 WebFluxRest 类中看到 this 正在完成。

    请注意,您实际上是在关闭前端的大门,以便通过这样做来利用异步优势。它会起作用,并允许您等待结果一出现就出现,但您会失去一些灵活性。

    【讨论】:

      猜你喜欢
      • 2016-01-29
      • 2018-03-24
      • 2016-07-07
      • 2011-10-20
      • 2022-11-30
      • 1970-01-01
      • 2018-08-20
      • 1970-01-01
      • 2023-04-04
      相关资源
      最近更新 更多