【发布时间】:2021-11-13 19:42:13
【问题描述】:
我正在尝试在 Spring Batch 作业中使用多线程步骤,但我得到一个“Scope 'job' is not active for current thread...”。我在 Spring 中尝试了一些方法,但此时我正在使用我认为是 OOTB Spring 构造的东西,但它仍然失败。
错误是:
2021-09-19 22:40:03,432 ERROR [https-jsse-nio-8448-exec-4]: org.springframework.batch.core.step.AbstractStep Encountered an error executing step writeToDatabaseStep in job softlayerUploadJob
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.softLayerDataItemQueue': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:365)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:192)
at com.sun.proxy.$Proxy126.read(Unknown Source)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94)
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:161)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159)
at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:353)
... 19 common frames omitted
基本工作结构简化: 作业 SoftLayer 上传作业 步骤:softlayerUploadFileStep(不能多线程) 从 Excel 文件中读取 写入 SoftLayerDataItemQueue) bean,最终写入 java.util.Queue 步骤:writeToDatabaseStep 从 SoftLayerDataItemQueue bean 读取 使用 JpaWriter 写入数据库
SoftLayerJobConfiguration.java
public class SoftLayerDataItemQueue implements ItemReaderWriterQueue<SoftLayerData> {
private static final Logger logger = LoggerFactory.getLogger(SoftLayerController.class);
private Map<Integer, Queue<SoftLayerData>> queueMap = new HashMap<>();
// private Queue<SoftLayerData> queue = new LinkedList<>();
// @Value("#{stepExecution.jobExecution.jobInstance.instanceId}")
@Value("#{jobExecution.jobInstance.instanceId}")
private int jobInstanceId;
public Queue<SoftLayerData> getQueue() {
Queue<SoftLayerData> result = queueMap.get(jobInstanceId);
logger.info("@@@SoftLayerDataItemQueue jobInstanceId=" + jobInstanceId);
if (result == null) {
result = new LinkedList<>();
queueMap.put(jobInstanceId, result);
}
logger.info("Returning queue with item count=" + result.size());
return result;
}
@Override
public void write(List<? extends SoftLayerData> items) throws Exception {
logger.info("@@@ Attempting to add item to queue with bean hashCode=" + this.hashCode() + " job instanceid="
+ jobInstanceId + " ");
logger.info("SoftLayerDataItemQueue: writing items: count=" + items.size());
if (logger.isDebugEnabled()) {
for (SoftLayerData item : items) {
logger.info("SoftLayerDataItemQueue: Adding items " + item.toString());
}
}
getQueue().addAll(items);
}
@Override
public SoftLayerData read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
logger.info("@@@ Attempting to remove item from queue with bean hashCode=" + this.hashCode()
+ " job instanceid=" + jobInstanceId + " ");
SoftLayerData result = null;
if (getQueue() != null && getQueue().size() > 0) {
result = getQueue().remove();
logger.info("SoftLayerDataItemQueue: Removing item " + result.toString());
} else {
logger.info("SoftLayerDataItemQueue: Empty queue. Returning null to signal EOF ");
}
return result;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: open()");
/* Unused method */
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: update()");
/* Unused method */
}
@Override
public void close() throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: close()");
/* Unused method */
}
}
SoftLayerDataItemQueue.java
public class SoftLayerDataItemQueue implements ItemReaderWriterQueue<SoftLayerData> {
private static final Logger logger = LoggerFactory.getLogger(SoftLayerController.class);
private Map<Integer, Queue<SoftLayerData>> queueMap = new HashMap<>();
@Value("#{jobExecution.jobInstance.instanceId}")
private int jobInstanceId;
public Queue<SoftLayerData> getQueue() {
Queue<SoftLayerData> result = queueMap.get(jobInstanceId);
logger.info("@@@SoftLayerDataItemQueue jobInstanceId=" + jobInstanceId);
if (result == null) {
result = new LinkedList<>();
queueMap.put(jobInstanceId, result);
}
logger.info("Returning queue with item count=" + result.size());
return result;
}
@Override
public void write(List<? extends SoftLayerData> items) throws Exception {
logger.info("SoftLayerDataItemQueue: writing items: count=" + items.size());
if (logger.isDebugEnabled()) {
for (SoftLayerData item : items) {
logger.info("SoftLayerDataItemQueue: Adding items " + item.toString());
}
}
getQueue().addAll(items);
}
@Override
public SoftLayerData read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
+ " job instanceid=" + jobInstanceId + " ");
SoftLayerData result = null;
if (getQueue() != null && getQueue().size() > 0) {
result = getQueue().remove();
logger.info("SoftLayerDataItemQueue: Removing item " + result.toString());
} else {
logger.info("SoftLayerDataItemQueue: Empty queue. Returning null to signal EOF ");
}
return result;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: open()");
/* Unused method */
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: update()");
/* Unused method */
}
@Override
public void close() throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: close()");
/* Unused method */
}
}
注意:我不喜欢使用 SoftLayerDataItemQueue,但我想不出任何其他方法来编写在一个步骤中处理的项目并在另一个步骤中读取它们,特别是对于大容量和并行处理。我希望 Spring 有某种方法可以将数据从一个步骤写入另一个步骤,但我找不到它。 SO 上的其他人建议写入文件或工作或步骤上下文。
【问题讨论】:
-
我不明白为什么你认为你需要多个步骤?为什么首先在 1 步中读取,然后在下一步中写入,这应该是带有读取器和写入器的 1 步(中间没有队列)。也许是处理器。在我看来,你让事情变得比必要的更复杂。
-
我和 M. Deinum 有同样的问题,我在这里问过这个问题:stackoverflow.com/questions/69149100/…。我认为不需要中间队列。单个(多线程或分区)面向块的步骤就足够了 IMO。
标签: java spring-boot jpa spring-batch