【发布时间】:2021-09-17 21:16:50
【问题描述】:
我们有一个用例,我们需要从一些分页 API 读取数据并写入一些下游 Kafka 主题。
我们已经能够通过 spring 批量集成远程分区 实现该解决方案,其中管理器通过创建包含要读取数据的页码和偏移量的 executionContext 来负责对任务进行分区。经理创建这个 executionContext 并将它们放在消息传递通道上(我可以使用 rabbitMQ 和 Kafka 主题,无论哪个提供解决方案)。工作人员(超过 1 个)从 messagesChannel 中选择该 executionContext 并完成从 API 读取数据并将其写入所需 Kafka 主题的任务。
上面的实现工作得很好。如果我一个接一个地为不同的客户运行相同的工作,这也可以正常工作。当我们想要为多个客户端并行运行同一个作业时,挑战就来了。 例如,我们并行启动 2 个客户端的作业。它为每个客户创建 1 名经理和 2 名工人。现在问题来了,当两个经理都将 executionContext 推送到 同一个消息传递通道 并且工作人员不知道要选择和执行哪个时。 此外,这两个作业共享相同的数据库 spring 批处理表,所以我怀疑它也会在该级别产生问题。
关于如何实现并行运行多个 Spring Batch Reporter 分区作业的任何输入或参考。
更新[2022 年 1 月 18 日]
我尝试在 here 处将 @StepScoped 添加到 MessageChannelPartitionHandler,以下是我得到的错误:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'partitioningMessageHandler': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalStateException: Target object of type [class com.sun.proxy.$Proxy78] has no eligible methods for handling Messages.
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:178) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:101) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1821) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getObjectForBeanInstance(AbstractAutowireCapableBeanFactory.java:1266) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.resolveTargetBeanFromMethodWithBeanAnnotation(AbstractMethodAnnotationPostProcessor.java:536) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.postProcess(AbstractMethodAnnotationPostProcessor.java:154) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.postProcessMethodAndRegisterEndpointIfAny(MessagingAnnotationPostProcessor.java:230) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.lambda$processAnnotationTypeOnMethod$1(MessagingAnnotationPostProcessor.java:220) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.afterSingletonsInstantiated(MessagingAnnotationPostProcessor.java:141) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:912) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at spring.batch.integration.Manager.main(Manager.java:11) ~[main/:na]
Caused by: java.lang.IllegalStateException: Target object of type [class com.sun.proxy.$Proxy78] has no eligible methods for handling Messages.
at org.springframework.util.Assert.state(Assert.java:94) ~[spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.validateFallbackMethods(MessagingMethodInvokerHelper.java:751) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:740) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:294) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:231) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.<init>(MethodInvokingMessageListProcessor.java:63) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.<init>(MethodInvokingMessageGroupProcessor.java:53) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.AggregatorFactoryBean.createHandler(AggregatorFactoryBean.java:211) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.AggregatorFactoryBean.createHandler(AggregatorFactoryBean.java:53) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.createHandlerInternal(AbstractSimpleMessageHandlerFactoryBean.java:198) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:186) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:60) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:171) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
... 20 common frames omitted
【问题讨论】:
标签: java spring-batch spring-integration spring-batch-integration