【发布时间】:2019-01-25 19:39:56
【问题描述】:
下面是我的 spring 批处理作业的配置,它从数据库中获取记录,在项目处理器中进行一些处理,更新状态列并写回数据库。
当我运行 10k 条记录时,我可以看到它一条一条地获取每条记录并以相同的方式更新状态。最初我打算使用多线程,但这没有任何意义,因为我的工作每天运行一次,记录数量从 10 到 100k 不等。 (记录在大多数情况下少于 5k,一年中的少数几天(5 到 10 天)达到 50k 到 100k)。
我不想在一年中的 10 天里增加更多的 CPU 并被 Kubernetes 收取费用。现在的问题是,当我运行这项工作时,它只需要 100 条记录,它独立运行每个选择查询,而不是一次需要 100 条。一次更新也是一条记录,处理 10k 条记录需要 10 分钟,这真的很慢。
如何才能更快地读取、处理和写入?我可以摆脱多线程并偶尔增加一点 CPU 利用率。更多信息以代码中的 cmets 形式提供。
@Configuration
@EnableBatchProcessing
public class BatchConfiguration extends DefaultBatchConfigurer{
public final static Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Autowired
MyRepository myRepository;
@Autowired
private EntityManagerFactory entityManagerFactory;
@Value("${chunk-size}")
private int chunkSize;
@Value("${max-threads}")
private int maxThreads;
private final DataSource dataSource;
/**
* @param dataSource
* Override to do not set datasource even if a datasource exist during intialization.
* Initialize will use a Map based JobRepository (instead of database) for Spring batch meta tables
*/
@Override
public void setDataSource(DataSource dataSource) {
}
@Override
public PlatformTransactionManager getTransactionManager() {
return jpaTransactionManager();
}
@Autowired
public BatchConfiguration(@Qualifier("dataSource") DataSource dataSource) {
this.dataSource = dataSource;
}
@Bean
public JpaTransactionManager jpaTransactionManager() {
final JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setDataSource(dataSource);
return transactionManager;
}
@Bean
@StepScope
public JdbcPagingItemReader<ModelEntity> importReader() { // I tried using RepositoryItemReader but records were skipped by JPA hence I went for JdbcPagingItemReader
JdbcPagingItemReader<ModelEntity> reader = new JdbcPagingItemReader<ModelEntity>();
final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
sqlPagingQueryProviderFactoryBean.setDataSource( dataSource );
sqlPagingQueryProviderFactoryBean.setSelectClause( "SELECT *" );
sqlPagingQueryProviderFactoryBean.setFromClause( "FROM mytable" );
sqlPagingQueryProviderFactoryBean.setWhereClause( "WHERE STATUS = 'myvalue' " );
sqlPagingQueryProviderFactoryBean.setSortKey( "primarykey" );
try {
reader.setQueryProvider( sqlPagingQueryProviderFactoryBean.getObject() );
} catch (Exception e) {
e.printStackTrace();
}
reader.setDataSource( dataSource );
reader.setPageSize( chunkSize );
reader.setSaveState( Boolean.FALSE );
reader.setRowMapper( new BeanPropertyRowMapper<ModelEntity>(ModelEntity.class ) );
return reader;
}
@Bean
public ItemWriter<ModelEntity> databaseWriter() {
RepositoryItemWriter<ModelEntity> repositoryItemWriter=new RepositoryItemWriter<>();
repositoryItemWriter.setRepository(myRepository);
repositoryItemWriter.setMethodName("save");
return repositoryItemWriter;
}
@Bean
public Myprocessor myprocessor() {
return new Myprocessor();
}
@Bean
public JobExecutionListener jobExecutionListener() {
return new JobExecutionListener();
}
@Bean
public StepExecutionListener stepExecutionListener() {
return new StepExecutionListener();
}
@Bean
public ChunkExecutionListener chunkListener() {
return new ChunkExecutionListener();
}
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(maxThreads);
return taskExecutor;
}
@Bean
public Job processJob() {
return jobBuilderFactory.get("myjob")
.incrementer(new RunIdIncrementer())
.start(processStep())
.listener(jobExecutionListener())
.build();
}
@Bean
public Step processStep() {
return stepBuilderFactory.get("processStep")
.<ModelEntity,ModelEntity>chunk(chunkSize)
.reader(importReader())
.processor(myprocessor())
.writer(databaseWriter())
.taskExecutor(taskExecutor())
.listener(stepExecutionListener())
.listener(chunkListener())
.transactionManager(getTransactionManager())
.throttleLimit(maxThreads)
.build();
}
}
我使用的存储库是JpaRepository 和下面的代码。 (假设其父类 CrudRepository 的 save 方法会做 save)
public interface MyRepository extends JpaRepository<ModelEntity, BigInteger> {
}
处理器如下
@Component
public class Myprocessor implements ItemProcessor<Myprocessor,Myprocessor> {
@Override
public ModelEntity process(ModelEntity modelEntity) throws Exception {
try {
// This is fast and working fine
if ((myProcessing)) {
modelEntity.setStatus(success);
} else {
modelEntity.setStatus(failed);
}
}
catch (Exception e){
logger.info( "Exception occurred while processing"+e );
}
return modelEntity;
}
// This is fast and working fine
public Boolean myProcessing(ModelEntity modelEntity){
//Processor Logic Here
return processingStatus;
}
}
下面的属性文件
logging.level.org.hibernate.SQL=DEBUG
logging.level.com.zaxxer.hikari.HikariConfig=DEBUG
logging.level.org.hibernate.type.descriptor.sql.BasicBinder=TRACE
logging.level.org.springframework.jdbc.core.JdbcTemplate=DEBUG
logging.level.org.springframework.jdbc.core.StatementCreatorUtils=TRACE
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.url=url
spring.datasource.username=username
spring.datasource.password=password
spring.jpa.hibernate.connection.provider_class
=org.hibernate.hikaricp.internal.HikariCPConnectionProvider
spring.jpa.database-platform=org.hibernate.dialect.Oracle10gDialect
spring.jpa.show-sql=false
spring.main.allow-bean-definition-overriding=true
spring.batch.initializer.enabled=false
spring.batch.job.enabled=false
spring.batch.initialize-schema=never
chunk-size=100
max-threads=5
【问题讨论】:
-
您是否尝试在您的
repositoryItemWriter中使用saveAll方法而不是save? -
是的,我也尝试过。但这不是问题
标签: java spring spring-data-jpa spring-data spring-batch