【问题标题】:Axon framework retry logicAxon 框架重试逻辑
【发布时间】:2019-07-15 16:53:09
【问题描述】:

我正在尝试使用 Axon 4.1+ 中的 eventProcessingModule 在 2 JVM 节点 K8 集群上执行重播事件。虽然我设置它会清理事件,但它只从一个节点中获取它,而另一个节点继续运行,因为它的跟踪事件仍然存在。

如何在所有 JVM 上同时禁用它,以便它可以正确重放?然后启用所有这些以继续处理命令。

我已尝试通过此代码增加线程,这会导致另一个问题,即现有令牌永远不会在 InitialSemgmentsCount 中增加,除非我从数据库中完全删除令牌。

    public void config(final EventProcessingConfigurer configurer) {
        configurer.registerTrackingEventProcessorConfiguration(c ->
                TrackingEventProcessorConfiguration
                        .forParallelProcessing(2)
                        .andInitialSegmentsCount(2));
        // .andTokenClaimInterval(10L, TimeUnit.SECONDS)); // Event
    }

我当前的设置:

# Application.yml
axon:
  distributed:
    enabled: true

具有以下内容的服务组件:

EventProcessingConfiguration eventProcessingModule 中自动连接

eventProcessingModule
        .eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
        .ifPresent(trackingEventProcessor -> {
            trackingEventProcessor.shutDown();
            trackingEventProcessor.resetTokens();
        });

// Thread.sleep() to verify with

eventProcessingModule
        .eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
        .ifPresent(trackingEventProcessor -> {
            trackingEventProcessor.start();
        });

示例代码在单个关闭/重置/启动设置中具有上述内容,但我将它们拆分只是为了查看它是如何工作的(相信在重置之后调用 @ResetHandler 因此启动等待,但不完全确定)

SkuProjection 组件使用@ResetHandler void 方法清理要重播的表。

根据当前哪个JVM有token,另一个会返回这个错误: SkuReplayService.startReplay: Failed on exception! Unable to claim token 'query.skuProcessor.SkuProjection[0]'. It is owned by '1@sku-7694bbc6b6-8p958'

我认为token_entry 表会在其“停止”时为所有者变为“null”,但由于有 2 个 JVM,当前没有令牌的一个将在另一个重播时接管它。在第一个节点停止其处理器后,确认第二个节点正在使用令牌运行。

【问题讨论】:

    标签: spring-boot axon


    【解决方案1】:

    我想我可以在这里给你一些指导。

    您正在研究如何跨多个 JVM 将操作委托给一组特定的跟踪事件处理器,对吗? 现在,API Axon 框架为您提供了关于 TrackingEventProcessor 的内容,并且有意在 TrackingEventProcessor 上进行维护。

    因此,执行 start()shutDown()processingStatus()resetTokens() 是对特定跟踪事件处理器实例的调用。

    注意:resetTokens() 方法有效地为跟踪事件处理器负责的事件处理程序发出重播。

    此外,您会看到 Unable to claim token 异常,因为框架要求您执行重置的 TrackingEventProcessor 是给定处理组的所有令牌的所有者。这样做的原因是它需要将令牌调整为ReplayTokens,以支持@ResetHandler等漂亮的功能。

    如果我是正确的,您现在要求的是:

    Axon 框架是否提供了一种方法来将特定的跟踪事件处理器操作委托给所有处理同一处理组的实例?

    我想我会很遗憾地让你失望@sherring,Axon Framework 不提供委派此类操作的方法。 实现这些委托启动/停止/重置操作的最快方法是设置Axon Server,因为标准版(免费)已经为您提供了此功能。

    如果出于某种原因,这款免费软件无法使用,这意味着您必须自己为 start/stop/reset/{insert-any-TrackingEventProcessor-operation} 创建这样的委托系统。或者,您可以更多地从运营的角度来处理这个问题。因此,关闭给定 Axon Framework 应用程序的第二个实例,确保只有一个您要重置的 TrackingEventProcessor 实例。

    希望这对您有所帮助@sherring!

    【讨论】:

    • 谢谢史蒂文。我有点想这将是必须将 JVM 缩小到 1 才能正确重新同步数据的情况。现在我们还没有使用 Axon Server 的选项,因为我们仍处于发现 / poc 状态。但如果我们发现需要,我们就会这样做。
    • 没有问题 Sherring,我很高兴能帮上忙!可以理解的是,当您仍处于项目的 PoC 阶段时,Axon Server 并未受到关注。如果您需要,只需弹出更多与 Axon 相关的问题,以进一步探索您的发现阶段!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-23
    • 2019-04-24
    相关资源
    最近更新 更多