【问题标题】:Java DB Queries and C3P0 for Concurrent calls用于并发调用的 Java DB 查询和 C3P0
【发布时间】:2017-08-28 15:55:16
【问题描述】:

我正试图围绕 C3P0 和数据库调用。我最初有一个在 SQLite 上运行的程序,现在我试图允许并发测试 MariaDB 上的查询。有几个项目我没有掌握。 SQLite 的最初设计是有一个生产者线程将查询放到队列中,而消费者线程将从队列中获取并将数据库查询发送到数据库。

我想知道这个单线程是否能够发出并发请求(因为它只有一个线程)。

其次,我遇到了一个问题,这显然没有返回连接,或者它似乎在大约 18 次查询后停止。队列中仍有项目,但程序只是停止并等待尝试新的连接。

我的主数据库调用线程类:

public class DBRunnable extends DBExtend implements Runnable
{
/**
 * Call the query builder instance
 */
protected QueryBuilder qb = QueryBuilder.getInstance();

/**
 * Call the point type converter instance
 */
protected PointTypeConv pv = PointTypeConv.getInstance();

/**
 * Singleton object
 */
private static DBRunnable db = null;

/**
 * Constructor
 */
public DBRunnable()
{
}

/**
 * Main thread functionality
 */
@Override
public void run()
{
    try
    {

        while (true)
        {

            long startTime = 0;

            QueryRequest msg = null;

            try
            {
                // Pull any existing query requests off the queue, if not, wait for one.
                msg = (QueryRequest) DBMigrationTool.dbProcQueue.take();
            } catch (Exception e)
            {
                errorLog.error("Unable to fetch message from message processing queue.");
            }

            // Good practice to create a new result set instead of reusing
            ResultSet rs = null;
            Statement stmt = null;

                    // Fetch the query and the request out of the QueryRequest object
                    String query = msg.getQuery();
                    // Make sure the query request isn't empty, if it is, there is no point in sending it to the DB

                    try (Connection conn = cpds.getConnection())
                    {
                        // Execute the given query and fetch the result from it
                        stmt = conn.createStatement();
                        startTime = System.currentTimeMillis();
                        stmt.setQueryTimeout(1800);
                        System.out.println(query);
                        stmt.execute(query);
                        rs = stmt.getResultSet();

                        if (rs != null)
                        {
                            try
                            {
                                int count = 0;
                                while (rs.next())
                                {
                                    count++;
                                }

                                System.out.println("Query Complete: " + (System.currentTimeMillis() - startTime) + "ms. Result count: " + count);

                                if (msg.getFlag() == 1)
                                {
                                    DBMigrationTool.flag = 0;
                                }

                            } catch (Exception e)
                            {
                                errorLog.error("Failed to process database result set.");
                            }

                        }
                        conn.close();
                    } catch (SQLException e)
                    {
                        errorLog.error("Query Error: " + msg.getQuery());
                        errorLog.error("Failed to issue database command: " + e);
                    } finally
                    {
                        if (rs != null)
                        {
                            try
                            {
                                rs.close();
                            } catch (SQLException e)
                            {
                                errorLog.error("Failed to close JDBC result set.");
                            }
                        }
                        if (stmt != null)
                        {
                            try
                            {
                                stmt.close();
                            } catch (SQLException e)
                            {
                                errorLog.error("Failed to close JDBC statement.");
                            }
                        }
                    }
                }                

    } finally
    {
        closeDB();
        DBMigrationTool.dbProcHandle.cancel(true);
    }

}

包含连接信息的我的接口数据库类:

public class DBExtend
{
/**
 * Standard timeout
 */
public static final int DB_TIMEOUT = 30;

/**
 * Standard error logger for log4j2
 */
protected static Logger errorLog = LogManager.getLogger(DBExtend.class.getName());

/**
 * Call to the query builder instance
 */
private static QueryBuilder qb = QueryBuilder.getInstance();

/**
 * DB connection
 */
protected static ComboPooledDataSource cpds;

/**
 * Constructor
 */
public DBExtend()
{
}

/**
 * startDB is an initialization function used to open a database connection
 * 
 * @param dbPath - System path to the database file
 */
public void startDB(String dbPath)
{

    cpds = new ComboPooledDataSource();
    cpds.setJdbcUrl("jdbc:sqlite:" + dbPath);
    cpds.setMinPoolSize(1);
    cpds.setTestConnectionOnCheckout(true);
    cpds.setAcquireIncrement(5);
    cpds.setMaxPoolSize(20);

    errorLog.info("Connection to SQLite has been established.");

}

public void startMariaDB(String tableName)
{
    cpds = new ComboPooledDataSource();
    cpds.setJdbcUrl("jdbc:mariadb://localhost:3306/" + tableName);
    cpds.setUser("root");
    cpds.setPassword("joy");
    cpds.setMinPoolSize(1);
    cpds.setTestConnectionOnCheckout(true);
    cpds.setAcquireIncrement(5);
    cpds.setMaxPoolSize(20);

    errorLog.info("Connection to MariaDB has been established.");
}

/**
 * Close DB is to close a database instance
 */
public void closeDB()
{
    try
    {
        cpds.close();

        errorLog.info("Connection to SQLite has been closed.");

    } catch (SQLException e)
    {
        errorLog.error(e.getMessage());
    } finally
    {
        try
        {
            if (cpds.getConnection() != null)
            {
                cpds.getConnection().close();
            }
            if (cpds != null)
            {
                cpds.close();
            }
        } catch (SQLException ex)
        {
            errorLog.error(ex.getMessage());
        }
    }
}

}

【问题讨论】:

    标签: java jdbc c3p0


    【解决方案1】:

    JDBC 驱动程序必须是线程安全的,它抽象了实现细节。请注意,尽管驱动程序是线程安全的,但从多个线程同时使用同一个连接对象仍然不是一个好主意。

    至于您的实际问题,您完全错误地使用了 C3P0 的数据源。连接池支持的数据源使用getConnection() 方法为用户提供来自该池的连接。当您关闭该连接时,该连接将返回到池中。

    这意味着您从池中获取连接,完成您的工作,然后将其关闭,以便将其返回到池中以供应用程序的其他部分使用。

    这意味着DBRunnable中的以下代码是错误的:

    if (cpds.getConnection().isValid(DB_TIMEOUT))
    

    您从池中获得一个连接,然后立即泄漏它(它不会返回到池中),因为您没有对它的引用。请注意,大多数连接池(有时是可选的)在返回连接之前都会进行连接验证,因此没有必要对其进行测试。

    对于您的DBExtend 课程也是如此,这是错误的:

    selectMariaDB:

    cpds.getConnection().setCatalog(DBName);
    

    在这里,您从池中获得了一个连接,并且永远不会关闭它,这意味着您已经“泄露”了这个连接。设置目录无效,因为此连接不会被重用。在这种情况下设置目录应该是连接池配置的一部分。

    closeDB:

    cpds.getConnection().close();
    

    这会从池中获得一个连接并立即将其关闭(将其返回到池中)。这没有实际目的。

    【讨论】:

    • 您能否推断一下如何处理您评论中的 JDBC 驱动程序部分?我知道“cpds.setDriverClass(“org.postgresql.Driver”);”打电话,但我应该为 SQLite 或 MariaDB 使用什么驱动程序?有什么地方可以找到上述司机吗?我正在从 Maven 中提取 JDBC 的 .jars。
    • 我修复了你上面提到的问题,删除了你提到的 3 个元素。我通过数据库 url 设置目录,现在唯一的问题是当我调用 SQLite 查询时,我需要使用 ATTACH 命令附加另一个表。以前,该程序使用持久的单一连接。现在,使用 C3P0,每个调用都需要打开/关闭一个连接。如何调整代码以仍然使用 ATTACH?
    • @Tacitus86 提出一个新问题,一次只关注一个问题。
    • 明白。但我相信第一条评论在这个答案下仍然是相关的。我说的对吗?
    • @Tacitus86 不是真的。您需要查看您正在使用的特定 JDBC 驱动程序库的文档。
    猜你喜欢
    • 2021-02-04
    • 1970-01-01
    • 2019-08-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多