【问题标题】:Axon Saga subscribing mode concurrency issueAxon Saga 订阅模式并发问题
【发布时间】:2020-08-21 08:35:19
【问题描述】:

在我的 Saga 中使用 Axon 4.3.5 从跟踪模式切换到订阅模式时,我看到了意外行为

看来,在订阅模式下,当两个线程同时到达两个@StarSaga 方法时,会为同一个关联键值创建两个 saga。 我错过了什么吗?

我有这个来重现它:

@Saga
@ProcessingGroup("Saga")
public class RaceSaga {

    @Inject
    protected transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "executionId")
    public void on(Exec exec) {
        commandGateway.sendAndWait(new CreateExecCommand(exec.getExecutionId(), exec.getDescription()));
    }

    @StartSaga
    @SagaEventHandler(associationProperty = "executionId")
    public void on(Risk risk) {
        commandGateway.sendAndWait(new CreateRiskCommand(risk.getExecutionId(), risk.getResult()));
    }
}

@IntegrationTest
class RaceConditionTest extends BaseIntegrationTest {

    @Autowired
    private EventGateway eventGateway;
    @Autowired
    private SagaStore sagaStore;

    @Test
    void sagaRace() {
        var execId = UUID.randomUUID();

        CompletableFuture.runAsync(() -> eventGateway.publish(new Exec(execId.toString(), "desc")));
        CompletableFuture.runAsync(() -> eventGateway.publish(new Risk(execId.toString(), "OK")));

        var association = new AssociationValue("executionId", execId.toString());
        await().during(5, SECONDS)
                .untilAsserted(() -> assertThat(sagaStore.findSagas(RaceSaga.class, association))
                        .hasSize(1));
    }
}

使用跟踪模式时测试通过,但订阅失败。 (yml 配置)

【问题讨论】:

    标签: axon saga


    【解决方案1】:

    老实说,考虑到测试设置,这是预期的行为,但需要一些解释。

    知道以下是订阅 (SEP) 和跟踪事件处理器 (TEP) 之间的主要区别:

    • SubscribingEventProcessor - 在EventBus 上发布事件的线程中调用,类似于推送机制。
    • TrackingEventProcessor - 在单独的线程中调用,从 EventStore 检索事件,类似于拉机制。

    这保证了无论采用何种方式并发发布事件,TEP都会保证事件中的事件处理顺序。

    在 SEP 方面,情况略有不同,为此我们需要稍微深入研究一下实现。您可以假设两个或更多事件的发布并不过分奇怪。考虑到域内的正确要求,很多聚合实现都可以做到这一点。该框架有一种方法可以将多个事件的这些事务分组在一个批次中。为此,它使用UnitOfWork。例如,如果您要输入聚合的命令处理函数,则可以确保UnitOfWork 处于活动状态以协调生命周期。其中一项任务是将事件配对成一批以进行发布。

    在您的测试用例中,您直接使用EventGateway。本质上完全没问题,但是测试的设置没有启动UnitOfWork 来协调这两个事件以按顺序发生。深入研究代码以了解发布到 SEP 的工作原理,您将在此阶段登陆 AbstractEventProcessor。当调用EventProcessor#publish(List<EventMessage>) 时,会进行验证以检查UnitOfWork 是否处于活动状态。如果是这样,则将事件添加到UnitOfWork 的正确阶段。

    当没有UnitOfWork (UoW) 处于活动状态时,将立即调用处理程序。

    所以,当使用TrackingEventProcessor 时,框架会有意识地启动一个 UoW 来批处理事件以按顺序处理。使用SubscribingEventProcessor 时,这项工作留给用户,假设用户通常会通过[命令处理-> 事件发布-> 事件处理] 的常规流程,这将确保UoW 处于活动状态。由于在您的集成测试中并非如此,因此两个发布操作都会立即调用RaceSagaSagaManager,由于并发而创建两个实例。

    请注意,建议将 TEP 用于此类流程。为 Saga 使用 SEP 可能意味着您将在应用程序(错误)关闭期间丢失一些事件。由于 SEP 是一种推送机制,因此无法从这些“丢失”(从事件处理器的角度)事件中恢复。 TEP 将解决此问题,因为它会自行处理事件并跟踪流程。

    相信这一点可以为您澄清@matpiera。

    【讨论】:

    • 感谢您的回答。这是我们经过一番研究后发现的。我们仍然对没有 TEP 支持 Sagas 的事实感到惊讶。测试实际上是一个真实的场景,因为我们有两条外部消息通过 Kafka。关于消息丢失评论,在消息完全处理之前保持 Kafka 确认是否足够?
    • 从 Saga 的角度来看,EventProcessor 被使用的担忧;它只是接收事件,如有必要,可能来自多个来源。知道 Kafka 扩展也可以设置为您可能知道的 TEP,因此最终省略了问题。但是,您也可以因此受益于 Kafka 重放事件的能力。顺便问一下您的最后一个问题,您是在谈论消费者确认吗?因此,来自KafkaSubscribableMessageSource? 的确认
    • 我明白了。那讲得通。我们为 Kafka 使用了不同的库,但是是的,我说的是消费者 ack,并且还使用了回复功能。
    • 我非常想帮助您了解 Kafka 的细节,但遗憾的是,这绝不是我的强项。当谈到 Axon 的 Kafka 扩展时,我绝对可以帮助你(因为我现在绝对是这方面的主要提交者),但如果你使用的是不同的库,我会发现很难给你正确的对此事的指导。
    猜你喜欢
    • 1970-01-01
    • 2016-11-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-14
    相关资源
    最近更新 更多