【问题标题】:Java - MySQL concurrency solution neededJava - 需要 MySQL 并发解决方案
【发布时间】:2014-03-21 05:45:03
【问题描述】:

我正在开发一个使用 MySQL 来写入和读取一些数据的 java 应用程序。过了一段时间(很长一段时间)我的 jdbc 连接关闭,我在很多论坛上阅读,但我仍然无法让它永远持续下去。

我想做的是下一个:5、6、24 小时后(仍然不知道多少小时),我将关闭连接并再次连接。问题是,如果另一个线程尝试使用该连接来编写一些东西,它将在异常时失败。因此,如果 jdbc 连接正在重新启动所有需要使用 jdbc 的线程,那么我要做的下一件事就是等到重新连接完成。如果另一个线程使用该连接,我也不想阻塞线程,只有在重新启动时。我也害怕死锁:)。

我在许多论坛上阅读了有关 java 并发以及如何管理它的信息,但仍然不知道该使用什么以及如何使用它。

有用的论坛:

http://www.vogella.com/tutorials/JavaConcurrency/article.html

http://en.wikipedia.org/wiki/Java_concurrency

http://tutorials.jenkov.com/java-concurrency/index.html

编辑:

为了更清楚我想做什么

Java 数据库类:

    public class Database {

        //Locks
        final Object readLock = new Object();
        final Object writeLock = new Object();
        final Object readWriteLock = new Object();
        boolean enableReadLock = false;
        boolean enableWriteLock = false;
        //Database parametars
        String user; //DB username /корисник на базата
        String password; //DB password /лозинка за базата
        String dbname; //DB name / име на базата
        String ip; //DB Server IP address / адреса на серверот каде е базата
        Connection connection; //mysql connection /


        public Database(String user, String password, String dbname, String ip) throws ClassNotFoundException {
            Class.forName("com.mysql.jdbc.Driver");
            this.user = user;
            this.password = password;
            this.dbname = dbname;
            this.ip = ip;
        }


        public Database(String user, String password, String dbname) throws ClassNotFoundException {
            Class.forName("com.mysql.jdbc.Driver");
            this.user = user;
            this.password = password;
            this.dbname = dbname;
            this.ip = "localhost";
        }


        public void setReadLock(boolean enable) {
            enableReadLock = enable;
        }

        /**
         * Enable or disable write lock
         *
         * @param enable true -> enable , false -> disable
         */
        public void setWriteLock(boolean enable) {
            enableWriteLock = enable;
        }

        /**
         * Enable or disable read-write lock
         *
         * @param enable true -> enable , false -> disable
         */
        public void setReadWriteLock(boolean enable) {
            enableReadLock = enable;
            enableWriteLock = enable;
        }

        public void reconnect() throws SQLException {
             connection.close();
             connection = DriverManager.getConnection("jdbc:mysql://" + ip + ":3306/" + dbname, user, password);
    }

        public void connect() throws SQLException {
            connection = DriverManager.getConnection("jdbc:mysql://" + ip + ":3306/" + dbname, user, password);
        }


        public void disconnect() throws SQLException {
            connection.close();
        }



        /**
         * Test connection with mysql server
         *
         * @return (boolean) true -> OK, false -> NOK
         */
        public boolean testConnection() {
            String SQL = "SELECT 1";
            try {
                PreparedStatement statement = connection.prepareStatement(SQL);
                ResultSet rs = statement.executeQuery();
                while (rs.next()) {
                }
            } catch (SQLException ex) {
                return false;
            } catch (NullPointerException ex){
                return false;
            }
            return true;
        }


        public ArrayList<HashMap<String, String>> executeSelect(String SQL) throws SQLException {
            if (enableReadLock && enableWriteLock) { //Доколку има read-write lock
                synchronized (readWriteLock) {
                    return select(SQL);
                }
            } else {
                if (enableReadLock) { //Доколку има read-lock
                    synchronized (readLock) {
                        return select(SQL);
                    }
                } else {
                    return select(SQL);
                }
            }
        }


        private ArrayList<HashMap<String, String>> select(String SQL) throws SQLException {
            ArrayList<HashMap<String, String>> results = new ArrayList<>();
            HashMap<String, String> row;
            PreparedStatement statement = connection.prepareStatement(SQL);
            ResultSet rs = statement.executeQuery();
            ResultSetMetaData rsmd = rs.getMetaData();
            int no_columns = rsmd.getColumnCount();
            while (rs.next()) {
                row = new HashMap<>();
                for (int i = 1; i <= no_columns; i++) {
                    row.put(rsmd.getColumnName(i), rs.getString(i));
                }
                results.add(row);
            }
            statement.close();
            rs.close();
            return results;
        }


        public long executeInsert(String SQL) throws SQLException {
            if (enableReadLock && enableWriteLock) {
                synchronized (readWriteLock) {
                    return insert(SQL);
                }
            } else {
                if (enableWriteLock) {
                    synchronized (writeLock) {
                        return insert(SQL);
                    }
                } else {
                    return insert(SQL);
                }
            }
        }


        private long insert(String SQL) throws SQLException {
            long ID = -1;
            PreparedStatement statement = connection.prepareStatement(SQL, Statement.RETURN_GENERATED_KEYS);
            statement.executeUpdate();
            ResultSet rs = statement.getGeneratedKeys();
            if (rs.next()) {
                ID = rs.getLong(1);
            }
            statement.close();
            return ID;
        }


        public int executeUpdate(String SQL) throws SQLException {
            if (enableReadLock && enableWriteLock) {
                synchronized (readWriteLock) {
                    return update(SQL);
                }
            } else {
                if (enableWriteLock) {
                    synchronized (writeLock) {
                        return update(SQL);
                    }
                } else {
                    return update(SQL);
                }
            }
        }


        private int update(String SQL) throws SQLException {
            PreparedStatement statement = connection.prepareStatement(SQL);
            int rez = statement.executeUpdate(SQL);
            statement.close();
            return rez;
        }


        public int executeDelete(String SQL) throws SQLException {
            if (enableReadLock || enableWriteLock) {
                synchronized (readWriteLock) {
                    synchronized (readLock) {
                        synchronized (writeLock) {
                            return delete(SQL);
                        }
                    }
                }
            } else {
                return delete(SQL);
            }
        }


        private int delete(String SQL) throws SQLException {
            PreparedStatement statement = connection.prepareStatement(SQL);
            int rez = statement.executeUpdate(SQL);
            statement.close();
            return rez;
        }
    }

在重新连接方法中,我需要一些锁或其他东西,这将使每个调用选择、更新、插入或删除方法的人等待(阻塞)直到重新连接完成。

【问题讨论】:

    标签: java mysql multithreading jdbc concurrency


    【解决方案1】:

    我认为最简单的解决方案是使用带有 已同步 标志的资源“连接”来标记所有方法。 http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html

    例如:

    Class YourClass
    {
      public synchronized boolean reconect(...) { ...}
      public synchronized String getData (...) {...}
    }
    

    这会为标记方法锁定 this 对象,因此您应该将您的连接封装在一个像这样的小类中,或者只锁定您的连接对象: http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html

    Class YourClass
    {
      public boolean reconect(...) {
       synchronized(con) 
       {
         ...
       }
      }
    
      public String getData (...) {
        synchronized(con) 
        {
         ...
        }
      }
      private Connection con;
    }
    

    这些同步区域永远不会同时运行。

    编辑: 考虑到您希望保护 reconnect 免受其他数据库操作的影响,您应该考虑使用信号量: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html

    Class YourClass
    {
      YourClass() {
        sem = new Semaphore(1);
      }
    
      public boolean reconect(...) {
         sem.acquire();
         ...
         sem.release();
      }
    
      public String getData (...) {
        synchronized(sem) 
        {
          if(sem.availablePermits()>0) sem.reducePermits(1);
        }
        ...
        ...
        synchronized(sem) 
        {
          sem.release();
        }
      }
      private Connection con;
      Semaphore sem;
    }
    

    【讨论】:

    • 这很简单,我同意 :) 但是如果一个线程想要使用 getData() 方法(在你的情况下)并且另一个线程正在使用它(不是重新连接方法),它将等到另一个线程释放锁(同步)。如果我理解错了,请纠正我。
    • @AdrianES 你是对的,但我认为这在你的代码中并不是一个糟糕的行为。如果您想要避免重新连接和“其他方法”同时运行,您可以保护整数(计数器)超过增量、减量和获取操作(使用上述第二种机制),仅在此计数器时解锁获取请求者是 0... 就像一个信号量
    • 你能编辑你的代码并告诉我你如何使用这个计数器吗?我也认为我应该结合两种并发机制来获得我需要的东西。
    【解决方案2】:

    试试这个oracle jdbc multithreading tutorial. 或者只是用户Apache connection pool

    多线程会提高你的性能,但有几个 你需要知道的事情:

    每个线程都需要自己的 JDBC 连接。无法共享连接 线程之间,因为每个连接也是一个事务。上传 将数据分块并偶尔提交一次以避免累积 巨大的回滚/撤消表。将任务切割成几个工作单元 每个单位做一项工作。详细说明最后一点:目前,您 有一个读取文件、解析文件、打开 JDBC 连接的任务, 进行一些计算,将数据发送到数据库等。

    你应该做什么:

    一个(!)线程来读取文件并从中创建“作业”。每项工作 应该包含一个小但不太小的“工作单元”。推那些 进入队列下一个线程等待队列中的作业并执行 计算。这可能在步骤 #1 中的线程等待时发生 为慢速硬盘返回新的数据行。的结果 此转换步骤进入下一个队列 一个或多个线程 通过 JDBC 上传数据。第一个和最后一个线程很漂亮 慢,因为它们受 I/O 限制(硬盘慢,网络 连接更糟)。再加上在数据库中插入数据是 非常复杂的任务(分配空间、更新索引、检查 外键)

    使用不同的工作线程有很多好处:

    单独测试每个线程很容易。由于他们不共享数据, 你不需要同步。队列会为你做到这一点 你可以 快速更改要调整的每个步骤的线程数 性能

    【讨论】:

    • 感谢@Rafik991,它会改变我的概念,我希望能有更好的表现。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-03-31
    • 1970-01-01
    • 1970-01-01
    • 2023-01-26
    • 2016-08-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多