【问题标题】:Spring batch JPAItemReader performance IssueSpring批处理JPAItemReader性能问题
【发布时间】:2019-01-25 19:39:56
【问题描述】:

下面是我的 spring 批处理作业的配置,它从数据库中获取记录,在项目处理器中进行一些处理,更新状态列并写回数据库。

当我运行 10k 条记录时,我可以看到它一条一条地获取每条记录并以相同的方式更新状态。最初我打算使用多线程,但这没有任何意义,因为我的工作每天运行一次,记录数量从 10 到 100k 不等。 (记录在大多数情况下少于 5k,一年中的少数几天(5 到 10 天)达到 50k 到 100k)。

我不想在一年中的 10 天里增加更多的 CPU 并被 Kubernetes 收取费用。现在的问题是,当我运行这项工作时,它只需要 100 条记录,它独立运行每个选择查询,而不是一次需要 100 条。一次更新也是一条记录,处理 10k 条记录需要 10 分钟,这真的很慢。

如何才能更快地读取、处理和写入?我可以摆脱多线程并偶尔增加一点 CPU 利用率。更多信息以代码中的 cmets 形式提供。

@Configuration
@EnableBatchProcessing
public class BatchConfiguration extends DefaultBatchConfigurer{

public final static Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);

@Autowired
JobBuilderFactory jobBuilderFactory;

@Autowired
StepBuilderFactory stepBuilderFactory;

@Autowired
MyRepository myRepository;


@Autowired
private EntityManagerFactory entityManagerFactory;

@Value("${chunk-size}")
private int chunkSize;

@Value("${max-threads}")
private int maxThreads;

private final DataSource dataSource;


/**
 * @param dataSource
 * Override to do not set datasource even if a datasource exist during intialization.
 * Initialize will use a Map based JobRepository (instead of database) for Spring batch meta tables
 */
@Override
public void setDataSource(DataSource dataSource) {
}

@Override
public PlatformTransactionManager getTransactionManager() {
    return jpaTransactionManager();
}


@Autowired
public BatchConfiguration(@Qualifier("dataSource") DataSource dataSource) {
    this.dataSource = dataSource;
}

@Bean
public JpaTransactionManager jpaTransactionManager() {
    final JpaTransactionManager transactionManager = new JpaTransactionManager();
    transactionManager.setDataSource(dataSource);
    return transactionManager;
}


@Bean
@StepScope
public JdbcPagingItemReader<ModelEntity> importReader() {  // I tried using RepositoryItemReader but records were skipped by JPA hence I went for JdbcPagingItemReader
    JdbcPagingItemReader<ModelEntity> reader = new JdbcPagingItemReader<ModelEntity>();
    final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
    sqlPagingQueryProviderFactoryBean.setDataSource( dataSource );
    sqlPagingQueryProviderFactoryBean.setSelectClause( "SELECT *" );
    sqlPagingQueryProviderFactoryBean.setFromClause( "FROM mytable" );
    sqlPagingQueryProviderFactoryBean.setWhereClause( "WHERE STATUS = 'myvalue' " );
    sqlPagingQueryProviderFactoryBean.setSortKey( "primarykey" );
    try {
        reader.setQueryProvider( sqlPagingQueryProviderFactoryBean.getObject() );
    } catch (Exception e) {
        e.printStackTrace();
    }
    reader.setDataSource( dataSource );
    reader.setPageSize( chunkSize );
    reader.setSaveState( Boolean.FALSE );
    reader.setRowMapper( new BeanPropertyRowMapper<ModelEntity>(ModelEntity.class ) );
    return reader;
}



@Bean
public ItemWriter<ModelEntity> databaseWriter() {
    RepositoryItemWriter<ModelEntity> repositoryItemWriter=new RepositoryItemWriter<>();
    repositoryItemWriter.setRepository(myRepository);
    repositoryItemWriter.setMethodName("save");
    return repositoryItemWriter;
}

@Bean
public Myprocessor myprocessor() { 
    return new Myprocessor();
}

@Bean
public JobExecutionListener jobExecutionListener() {
    return new JobExecutionListener();
}

@Bean
public StepExecutionListener stepExecutionListener() {
    return new StepExecutionListener();
}

@Bean
public ChunkExecutionListener chunkListener() {
    return new ChunkExecutionListener();
}

@Bean
public TaskExecutor taskExecutor() {
 SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
 taskExecutor.setConcurrencyLimit(maxThreads);
return taskExecutor;
}

@Bean
public Job processJob() {
    return jobBuilderFactory.get("myjob")
            .incrementer(new RunIdIncrementer())
            .start(processStep())
            .listener(jobExecutionListener())
            .build();
}

@Bean
public Step processStep() {
    return stepBuilderFactory.get("processStep")
            .<ModelEntity,ModelEntity>chunk(chunkSize)
            .reader(importReader())
            .processor(myprocessor())
            .writer(databaseWriter())
            .taskExecutor(taskExecutor())
            .listener(stepExecutionListener())
            .listener(chunkListener())
            .transactionManager(getTransactionManager())
            .throttleLimit(maxThreads)
            .build();
    }

}

我使用的存储库是JpaRepository 和下面的代码。 (假设其父类 CrudRepository 的 save 方法会做 save)

public interface MyRepository extends JpaRepository<ModelEntity, BigInteger> {

}

处理器如下

@Component
public class Myprocessor implements ItemProcessor<Myprocessor,Myprocessor> {

@Override
public ModelEntity process(ModelEntity modelEntity) throws Exception {
    try {
    // This is fast and working fine
       if ((myProcessing)) {
            modelEntity.setStatus(success);
        } else {
            modelEntity.setStatus(failed);
        }
    }
    catch (Exception e){
        logger.info( "Exception occurred while processing"+e );
      }
    return modelEntity;
 }

 // This is fast and working fine
 public Boolean myProcessing(ModelEntity modelEntity){
 //Processor Logic Here
    return processingStatus;
 }

 }

下面的属性文件

logging.level.org.hibernate.SQL=DEBUG
logging.level.com.zaxxer.hikari.HikariConfig=DEBUG
logging.level.org.hibernate.type.descriptor.sql.BasicBinder=TRACE 
logging.level.org.springframework.jdbc.core.JdbcTemplate=DEBUG
logging.level.org.springframework.jdbc.core.StatementCreatorUtils=TRACE


spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.url=url
spring.datasource.username=username
spring.datasource.password=password 
spring.jpa.hibernate.connection.provider_class
=org.hibernate.hikaricp.internal.HikariCPConnectionProvider
spring.jpa.database-platform=org.hibernate.dialect.Oracle10gDialect
spring.jpa.show-sql=false
spring.main.allow-bean-definition-overriding=true
spring.batch.initializer.enabled=false
spring.batch.job.enabled=false
spring.batch.initialize-schema=never 
chunk-size=100
max-threads=5

【问题讨论】:

  • 您是否尝试在您的repositoryItemWriter 中使用saveAll 方法而不是save
  • 是的,我也尝试过。但这不是问题

标签: java spring spring-data-jpa spring-data spring-batch


【解决方案1】:

您只需一个配置属性即可为 INSERT、UPDATE 和 DELETE 语句启用 JDBC 批处理:

spring.jpa.properties.hibernate.jdbc.batch_size 

它决定了一次发送到数据库执行的更新数量。

详情见this link

【讨论】:

    【解决方案2】:

    谢谢大家的建议。我自己发现了这个问题。我正在使用 JdbcPagingItemReader 和 RepositoryItemWriter。阅读器按预期工作,但编写器正在为处理器后传递的每条记录触发选择查询。我相信背后的原因是,由于阅读器不是标准的 JPA 阅读器,因此该记录仅在处理器之后才会持久保存到 JPA。不过我不确定。但是将编写器更改为 JdbcBatchItemWriter 解决了这个问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-08-08
      • 1970-01-01
      • 2019-02-14
      • 2013-05-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多