【发布时间】:2021-12-24 08:34:01
【问题描述】:
我有一个 Java 应用程序,它从 CSV 文件中读取数据,将它们转换为业务对象,然后将它们保存到数据库中。大部分工作是单线程完成的,但其中一些工作是使用多线程完成的。该应用程序使用 Spring 框架,Hibernate 作为 ORM 提供者,c3p0 作为连接池提供者,MySQL 作为数据库。应用程序的多线程部分使用@Async 进行注释,使用作为将执行此工作的TaskExecutor 的bean 创建的ThreadPoolTaskExecutor。 @Aysnc 注解的方法返回 CompletableFuture。
使用一个小的 CSV 文件(少于 10k 条记录)一切都很好。但是,我们的一些客户有更大的文件(150k 记录),这种负载增加会在应用程序的多线程部分产生问题。这是@Async 注解的方法之一:
public class AppointmentTreeDAO {
@Autowired
private AppointmentRepository appointmentRepository;
private static final Logger logger = LoggerFactory.getLogger(AppointmentTreeDAO.class);
@Async
public CompletableFuture<Appointment> getObjectTree(Long oldApptId) {
logger.info("getting appointment thread");
return CompletableFuture.completedFuture(appointmentRepository.findByApptId(oldApptId));
}
}
这是 AppointmentRepository 类(最后一个方法是相关代码调用的方法):
public interface AppointmentRepository extends JpaRepository<Appointment, Long> {
List<Appointment> findByAccountId(String accountId);
List<Appointment> findByAccountIdAndProductIdAndApptStartDatetimeBetween(String accountId,
String productId, Date startDate, Date endDte);
Slice<Appointment> findByAccountIdAndProductIdAndApptStartDatetimeBetween(String accountId,
String productId, Date startDate, Date endDte, Pageable pageable);
@Query(value = "select a.appt_id from appointment a where a.appt_start_datetime " +
"between :startDate AND :endDate and a.account_id = :accountId and a.product_id = :productId",
nativeQuery = true)
List<Long> findByApptIdBetween(@Param("startDate") String startDate, @Param("endDate") String endDate,
@Param("accountId") String accountId, @Param("productId") String productId);
Appointment findByApptId(Long apptId);
}
由于我在 AppointmentTreeDAO 类中有该日志行,因此我可以在日志中看到每个线程都在工作:
2:58:12.771 PM
2021-11-11 14:58:12.771 INFO 1 --- [mtSave-38] c.t.etl.repository.AppointmentTreeDAO : getting appointment thread
最终,我开始从线程中看到这样的异常(在这种情况下与上面的线程相同):
2:58:16.790 PM
2021-11-11 14:58:16.790 WARN 1 --- [mtSave-38] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: 08001
2:58:16.803 PM
2021-11-11 14:58:16.803 ERROR 1 --- [mtSave-38] o.h.engine.jdbc.spi.SqlExceptionHelper : Could not create connection to database server. Attempted reconnect 3 times. Giving up.
2:58:16.804 PM
2021-11-11 14:58:16.804 WARN 1 --- [mtSave-38] c.t.etl.pipeline.load.Loader : java.util.concurrent.CompletionException: org.springframework.dao.DataAccessResourceFailureException: Unable to acquire JDBC Connection; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$doSubmit$3(AsyncExecutionAspectSupport.java:279)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.dao.DataAccessResourceFailureException: Unable to acquire JDBC Connection; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
at org.springframework.orm.jpa.vendor.HibernateJpaDialect.convertHibernateAccessException(HibernateJpaDialect.java:255)
at org.springframework.orm.jpa.vendor.HibernateJpaDialect.translateExceptionIfPossible(HibernateJpaDialect.java:233)
at org.springframework.orm.jpa.AbstractEntityManagerFactoryBean.translateExceptionIfPossible(AbstractEntityManagerFactoryBean.java:551)
at org.springframework.dao.support.ChainedPersistenceExceptionTranslator.translateExceptionIfPossible(ChainedPersistenceExceptionTranslator.java:61)
at org.springframework.dao.support.DataAccessUtils.translateIfNecessary(DataAccessUtils.java:242)
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:152)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:145)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy148.findByApptId(Unknown Source)
at com.etl.repository.AppointmentTreeDAO.getObjectTree(AppointmentTreeDAO.java:25)
at com.etl.repository.AppointmentTreeDAO$$FastClassBySpringCGLIB$$fb95149d.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$doSubmit$3(AsyncExecutionAspectSupport.java:276)
... 4 more
Caused by: org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:48)
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:113)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:99)
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:111)
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:138)
at org.hibernate.engine.jdbc.internal.StatementPreparerImpl.connection(StatementPreparerImpl.java:50)
at org.hibernate.engine.jdbc.internal.StatementPreparerImpl$5.doPrepare(StatementPreparerImpl.java:149)
at org.hibernate.engine.jdbc.internal.StatementPreparerImpl$StatementPreparationTemplate.prepareStatement(StatementPreparerImpl.java:176)
at org.hibernate.engine.jdbc.internal.StatementPreparerImpl.prepareQueryStatement(StatementPreparerImpl.java:151)
at org.hibernate.loader.Loader.prepareQueryStatement(Loader.java:2103)
at org.hibernate.loader.Loader.executeQueryStatement(Loader.java:2040)
at org.hibernate.loader.Loader.executeQueryStatement(Loader.java:2018)
at org.hibernate.loader.Loader.doQuery(Loader.java:948)
at org.hibernate.loader.Loader.doQueryAndInitializeNonLazyCollections(Loader.java:349)
at org.hibernate.loader.Loader.doList(Loader.java:2849)
at org.hibernate.loader.Loader.doList(Loader.java:2831)
at org.hibernate.loader.Loader.listIgnoreQueryCache(Loader.java:2663)
at org.hibernate.loader.Loader.list(Loader.java:2658)
at org.hibernate.loader.hql.QueryLoader.list(QueryLoader.java:506)
at org.hibernate.hql.internal.ast.QueryTranslatorImpl.list(QueryTranslatorImpl.java:400)
at org.hibernate.engine.query.spi.HQLQueryPlan.performList(HQLQueryPlan.java:219)
at org.hibernate.internal.SessionImpl.list(SessionImpl.java:1414)
at org.hibernate.query.internal.AbstractProducedQuery.doList(AbstractProducedQuery.java:1625)
at org.hibernate.query.internal.AbstractProducedQuery.list(AbstractProducedQuery.java:1593)
at org.hibernate.query.internal.AbstractProducedQuery.getSingleResult(AbstractProducedQuery.java:1641)
at org.hibernate.query.criteria.internal.compile.CriteriaQueryTypeQueryAdapter.getSingleResult(CriteriaQueryTypeQueryAdapter.java:111)
at org.springframework.data.jpa.repository.query.JpaQueryExecution$SingleEntityExecution.doExecute(JpaQueryExecution.java:196)
at org.springframework.data.jpa.repository.query.JpaQueryExecution.execute(JpaQueryExecution.java:88)
at org.springframework.data.jpa.repository.query.AbstractJpaQuery.doExecute(AbstractJpaQuery.java:155)
at org.springframework.data.jpa.repository.query.AbstractJpaQuery.execute(AbstractJpaQuery.java:143)
at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:137)
at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:121)
at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:152)
at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:131)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137)
... 22 more
Caused by: java.sql.SQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 3 times. Giving up.
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73)
at com.mysql.cj.jdbc.ConnectionImpl.connectWithRetries(ConnectionImpl.java:903)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:828)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:453)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:246)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at org.springframework.jdbc.datasource.DriverManagerDataSource.getConnectionFromDriverManager(DriverManagerDataSource.java:155)
at org.springframework.jdbc.datasource.DriverManagerDataSource.getConnectionFromDriver(DriverManagerDataSource.java:146)
at org.springframework.jdbc.datasource.AbstractDriverBasedDataSource.getConnectionFromDriver(AbstractDriverBasedDataSource.java:205)
at org.springframework.jdbc.datasource.AbstractDriverBasedDataSource.getConnection(AbstractDriverBasedDataSource.java:169)
at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122)
at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:38)
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:108)
... 60 more
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at sun.reflect.GeneratedConstructorAccessor129.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)
at com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:89)
at com.mysql.cj.NativeSession.connect(NativeSession.java:144)
at com.mysql.cj.jdbc.ConnectionImpl.connectWithRetries(ConnectionImpl.java:847)
... 73 more
Caused by: java.net.NoRouteToHostException: Cannot assign requested address (Address not available)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at com.mysql.cj.protocol.StandardSocketFactory.connect(StandardSocketFactory.java:155)
at com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:63)
... 75 more
2:58:16.804 PM
2021-11-11 14:58:16.804 WARN 1 --- [mtSave-38] c.t.etl.pipeline.load.Loader : Thread 77 ProcessFutures Loop Exception: org.springframework.dao.DataAccessResourceFailureException: Unable to acquire JDBC Connection;
nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
在抛出上述异常后不久,它又开始工作,现在它能够在不久前调用数据库:
2:58:16.807 PM
2021-11-11 14:58:16.807 INFO 1 --- [mtSave-38] c.t.etl.repository.AppointmentTreeDAO : getting appointment thread
如果不是全部,许多不同的线程会间歇性地发生这种情况。但最终所有工作都完成了,并且所有期货都从该方法返回。然后应用程序恢复为单线程并且异常停止。一旦应用程序返回到使用多个线程,就会再次发生上述相同的异常。
这是我的 persistence.xml 文件:
<properties>
<property name="javax.persistence.jdbc.driver" value="com.mysql.jdbc.Driver" />
<property name="hibernate.dialect" value="org.hibernate.dialect.MySQLDialect" />
<property name="hibernate.show_sql" value="false" />
<property name="hibernate.id.new_generator_mappings" value="false" />
<property name="org.hibernate.flushMode" value="ALWAYS" />
<property name="hibernate.c3p0.acquire_increment" value="1" />
<property name="hibernate.c3p0.idle_test_period" value="60"/>
<property name="hibernate.c3p0.max_size" value="54"/>
<property name="hibernate.c3p0.max_statements" value="50"/>
<property name="hibernate.c3p0.min_size" value="5"/>
</properties>
TaskExecutor 是这样创建的:
public Executor taskExecutor() {
logger.info("Creating Async Task Executor");
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(45);
executor.setMaxPoolSize(45);
executor.setThreadNamePrefix("mtSave-");
executor.initialize();
executor.setKeepAliveSeconds(120);
return executor;
}
我的数据库连接字符串上有 autoReconnect=true&maxReconnects=10。
似乎线程正在渴望连接,但我不明白为什么。工作项总是比线程数多。 150k 的工作记录,而高端的 45 个线程,以及低于 10k 的低端工作记录。在引入多线程之前,应用程序从未抛出这些异常。我将连接池最大大小 (54) 设置为高于我拥有的线程数 (45)。
我尝试更改线程数、连接池的最大大小、不同大小的文件并进行了大量研究。请指教。
【问题讨论】:
标签: java mysql spring multithreading hibernate