【问题标题】:Spring batch integration remote partitioning - running parallel jobsSpring批处理集成远程分区-运行并行作业
【发布时间】:2021-09-17 21:16:50
【问题描述】:

我们有一个用例,我们需要从一些分页 API 读取数据并写入一些下游 Kafka 主题。

我们已经能够通过 spring 批量集成远程分区 实现该解决方案,其中管理器通过创建包含要读取数据的页码和偏移量的 executionContext 来负责对任务进行分区。经理创建这个 executionContext 并将它们放在消息传递通道上(我可以使用 rabbitMQ 和 Kafka 主题,无论哪个提供解决方案)。工作人员(超过 1 个)从 messagesChannel 中选择该 executionContext 并完成从 API 读取数据并将其写入所需 Kafka 主题的任务。

上面的实现工作得很好。如果我一个接一个地为不同的客户运行相同的工作,这也可以正常工作。当我们想要为多个客户端并行运行同一个作业时,挑战就来了。 例如,我们并行启动 2 个客户端的作业。它为每个客户创建 1 名经理和 2 名工人。现在问题来了,当两个经理都将 executionContext 推送到 同一个消息传递通道 并且工作人员不知道要选择和执行哪个时。 此外,这两个作业共享相同的数据库 spring 批处理表,所以我怀疑它也会在该级别产生问题。

关于如何实现并行运行多个 Spring Batch Reporter 分区作业的任何输入或参考。

更新[2022 年 1 月 18 日]

我尝试在 here 处将 @StepScoped 添加到 MessageChannelPartitionHandler,以下是我得到的错误:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'partitioningMessageHandler': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalStateException: Target object of type [class com.sun.proxy.$Proxy78] has no eligible methods for handling Messages.
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:178) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:101) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1821) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getObjectForBeanInstance(AbstractAutowireCapableBeanFactory.java:1266) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.resolveTargetBeanFromMethodWithBeanAnnotation(AbstractMethodAnnotationPostProcessor.java:536) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.postProcess(AbstractMethodAnnotationPostProcessor.java:154) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.postProcessMethodAndRegisterEndpointIfAny(MessagingAnnotationPostProcessor.java:230) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.lambda$processAnnotationTypeOnMethod$1(MessagingAnnotationPostProcessor.java:220) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.afterSingletonsInstantiated(MessagingAnnotationPostProcessor.java:141) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:912) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at spring.batch.integration.Manager.main(Manager.java:11) ~[main/:na]
Caused by: java.lang.IllegalStateException: Target object of type [class com.sun.proxy.$Proxy78] has no eligible methods for handling Messages.
    at org.springframework.util.Assert.state(Assert.java:94) ~[spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.validateFallbackMethods(MessagingMethodInvokerHelper.java:751) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:740) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:294) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:231) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.<init>(MethodInvokingMessageListProcessor.java:63) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.<init>(MethodInvokingMessageGroupProcessor.java:53) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AggregatorFactoryBean.createHandler(AggregatorFactoryBean.java:211) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AggregatorFactoryBean.createHandler(AggregatorFactoryBean.java:53) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.createHandlerInternal(AbstractSimpleMessageHandlerFactoryBean.java:198) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:186) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:60) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:171) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    ... 20 common frames omitted

【问题讨论】:

    标签: java spring-batch spring-integration spring-batch-integration


    【解决方案1】:

    在这样的设置中,MessageChannelPartitionHandler 应该是步进范围的。 Javadoc中对此有注释:

    Note: The reply channel for this is instance based.
    Sharing this component across multiple step instances may result in the
    crossing of messages. It's recommended that this component be step or job scoped.
    

    将此 bean 设为步进范围应该可以解决问题。

    【讨论】:

    • 我们没有使用MessageChannelPartitionHandler 明确地使用remotePartitioningManagerStepBuilderFactory.build() 方法来创建一个。此外,我们使用的是DirectChannel,而不是MessageChannel。你是否也看到了这个问题。如果可能的话,您能否向我推荐任何支持并行运行多个作业和远程分区的示例?
    • 另外,你提到的 Javadoc 谈到了回复通道,但是我们不会有 2 个管理器实例在同一个队列上发送客户端 A 和客户端 B 的请求的问题吗?工作人员也特定于客户端,不了解如何处理来自另一个客户端请求的 executionContext。
    • 请分享您的代码并展示您如何并行运行两个作业来导致问题,我会尽力提供帮助。
    • 最新代码可在here 获得,我尝试将@StepScope 添加到MessageChannelPartitionHandler。但它无法将 partitionHandler 分配给 aggregatorFactoryBean.setProcessorBean(partitionHandler),因为 bean 是 step 范围的,因为此时不可用。
    • 能否请您看看并帮助我使用@StepScope 配置 MessageChannelPartitionHandler?
    猜你喜欢
    • 2014-09-28
    • 1970-01-01
    • 1970-01-01
    • 2021-01-15
    • 2015-07-30
    • 2013-10-21
    • 2014-06-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多