【发布时间】:2019-07-15 16:53:09
【问题描述】:
我正在尝试使用 Axon 4.1+ 中的 eventProcessingModule 在 2 JVM 节点 K8 集群上执行重播事件。虽然我设置它会清理事件,但它只从一个节点中获取它,而另一个节点继续运行,因为它的跟踪事件仍然存在。
如何在所有 JVM 上同时禁用它,以便它可以正确重放?然后启用所有这些以继续处理命令。
我已尝试通过此代码增加线程,这会导致另一个问题,即现有令牌永远不会在 InitialSemgmentsCount 中增加,除非我从数据库中完全删除令牌。
public void config(final EventProcessingConfigurer configurer) {
configurer.registerTrackingEventProcessorConfiguration(c ->
TrackingEventProcessorConfiguration
.forParallelProcessing(2)
.andInitialSegmentsCount(2));
// .andTokenClaimInterval(10L, TimeUnit.SECONDS)); // Event
}
我当前的设置:
# Application.yml
axon:
distributed:
enabled: true
具有以下内容的服务组件:
在EventProcessingConfiguration eventProcessingModule 中自动连接
eventProcessingModule
.eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
.ifPresent(trackingEventProcessor -> {
trackingEventProcessor.shutDown();
trackingEventProcessor.resetTokens();
});
// Thread.sleep() to verify with
eventProcessingModule
.eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
.ifPresent(trackingEventProcessor -> {
trackingEventProcessor.start();
});
示例代码在单个关闭/重置/启动设置中具有上述内容,但我将它们拆分只是为了查看它是如何工作的(相信在重置之后调用 @ResetHandler 因此启动等待,但不完全确定)
SkuProjection 组件使用@ResetHandler void 方法清理要重播的表。
根据当前哪个JVM有token,另一个会返回这个错误:
SkuReplayService.startReplay: Failed on exception! Unable to claim token 'query.skuProcessor.SkuProjection[0]'. It is owned by '1@sku-7694bbc6b6-8p958'
我认为token_entry 表会在其“停止”时为所有者变为“null”,但由于有 2 个 JVM,当前没有令牌的一个将在另一个重播时接管它。在第一个节点停止其处理器后,确认第二个节点正在使用令牌运行。
【问题讨论】:
标签: spring-boot axon