【问题标题】:Java Multithreading for Mysql Inserts用于 Mysql 插入的 Java 多线程
【发布时间】:2018-03-05 18:34:19
【问题描述】:

我现在用谷歌搜索了一段时间,但我无法让它工作。我的方法是拥有一个多线程软件,它在一个线程上收集数据,然后在第一个线程继续收集数据时在第二个线程中执行 Mysql 的批处理查询。目标应该是为数百万次插入使用尽可能少的 RAM。在我的数据收集过程中,我使用 MYSQLInsertThread 的一个对象,并像这样插入数据:

String[] types = {"Long", "Long"};
int[] pos = {1, 1};
Object[] values = {123, 456};

nodeTagsInsertThread.addBatch(types, pos, values);

第一次调用有效,如果我将 batchCount 设置为 100,我的数据库中有 104 个条目,但这是我的班级产生的唯一条目(它应该导入 500 万个条目!

public class MYSQLInsertThread  implements Runnable
{
    private Thread t;
    private String threadName = "MYSQL InsertThread";
    private String query;

    private PreparedStatement pstmt;
    private PreparedStatement pstmt2;

    private long batchCount;
    private long maxBatchAmount;

    private Boolean pstmt1Active = true;
    private Boolean isRunning = false;

    public MYSQLInsertThread(String name, String query, Connection conn, int maxBatchAmount) throws SQLException
    {
        threadName = name;
        this.pstmt = conn.prepareStatement(query);
        this.pstmt2 = conn.prepareStatement(query);
        this.query = query;
        this.maxBatchCount = maxBatchAmount;
        System.out.println("Creating Thread: " + name);
    }

    public synchronized void addBatch(String[] types, int[] positions, Object[] values) throws SQLException
    {
        PreparedStatement _pstmt;
        if(pstmt1Active) 
        {
            _pstmt = pstmt;
        }
        else 
        {
            _pstmt = pstmt2;
        }

        if(_pstmt != null)
        {
            for(int i=0;i<types.length; i++) 
            {
                switch(types[i]) 
                {
                    case "string":
                    case "String":
                        _pstmt.setString(positions[i], (String)values[i]);
                        break;
                    case "long":
                    case "Long":
                        _pstmt.setLong(positions[i], (long)values[i]);
                        break;
                    case "int":
                    case "Integer":
                        _pstmt.setInt(positions[i], (int)values[i]);
                        break;
                    case "double":
                    case "Double":
                        _pstmt.setDouble(positions[i], (double)values[i]);
                        break;
                    default:
                        break;
                }
            }
            _pstmt.addBatch();
            batchCount++;

            if(batchCount % maxBatchCount == 0) 
            {
                System.out.println("["+ this.threadName +"]Adding " + batchCount + " Entrys to DB" );
                this.executeBatch();
            }
        }
        else 
        {
            System.err.println("[MYSQLInsertThread]Error PreparedStatment is NULL, Parameter could not be added");
        }
    }

    public synchronized void executeBatch() 
    {
        PreparedStatement _pstmt;
        if(pstmt1Active) 
        {
            _pstmt = pstmt;
        }
        else 
        {
            _pstmt = pstmt2;
        }

        if(_pstmt != null) 
        {
            if(isRunning)System.out.println("Waiting for previous Batch Execution to finish");
            while(isRunning) 
            {
                System.out.print(".");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) 
                {
                    e.printStackTrace();
                }
            }
            this.start();
            System.out.println("Execution Started Successfully");
        }
        else 
        {
            System.err.println("[" + this.threadName + "]PSTMT is NULL");
        }
    }

    @Override
    public void run() 
    {
        PreparedStatement _pstmt;

        if(pstmt1Active) 
        {
            _pstmt = pstmt;
        }
        else 
        {
            _pstmt = pstmt2;
        }

        if(_pstmt != null) 
        {
            isRunning = true;   

            pstmt1Active = !pstmt1Active;

            try 
            {
                _pstmt.executeBatch();
            }
            catch(Exception e) 
            {
                e.printStackTrace();
            }

            isRunning = false;  
        }
    }

    public void start()
    {
        if(t == null)
        {
            t = new Thread(this, threadName);
            t.start();
        }
    }

}

【问题讨论】:

  • 我建议您不要像这样交换两个 PreparedStatement 实例,而是获得一个新的 PreparedStatement。或者只使用一个并在其上进行同步。
  • 在 PreparedStatement 上不应该有能力,如果准备好的语句正在执行批处理,我不确定在执行期间添加新批处理是否安全,如果我创建一个新的 PreparedStatement,我会覆盖正在运行的执行,我想这应该会杀死正在运行的执行

标签: java mysql multithreading


【解决方案1】:

我现在自己通过 youtube 视频搜索找到了一个解决方案:D

我创建了一个新的类来保存我的 PreparedStatements。如果一个语句正在执行,则预先设置一个标志。如果第二个 Statemet 现在想要执行 Batch,但前一个 Statement 仍在运行,它会使收集线程等待写入过程的通知。编写过程在语句执行完成后通知! 这是我的代码:

public class ThreadPreparedStatement 
{
    private String query;
    private String threadName;

    private PreparedStatement pstmt1;
    private PreparedStatement pstmt2;

    private boolean pstmt1Active;
    private boolean isRunning;

    private long maxBatchAmount;
    private long batchCount;

    public ThreadPreparedStatement(String threadName, String query, Connection conn, long maxBatchAmount) throws SQLException
    {
        this.threadName = threadName;

        this.pstmt1 = conn.prepareStatement(query);
        this.pstmt2 = conn.prepareStatement(query);

        this.maxBatchAmount = maxBatchAmount;
        this.query = query;

        this.batchCount = 0;
    }

    public synchronized void addParameters(String[] types, int[] positions, Object[] values) throws SQLException
    {

        PreparedStatement _pstmt;
        if(pstmt1Active) 
        {
            _pstmt = pstmt1;
        }
        else 
        {
            _pstmt = pstmt2;
        }

        if(_pstmt != null)
        {
            for(int i=0;i<types.length; i++) 
            {
                switch(types[i]) 
                {
                    case "string":
                    case "String":
                        _pstmt.setString(positions[i], (String)values[i]);
                        break;
                    case "long":
                    case "Long":
                        _pstmt.setLong(positions[i], (long)values[i]);
                        break;
                    case "int":
                    case "Integer":
                        _pstmt.setInt(positions[i], (int)values[i]);
                        break;
                    case "double":
                    case "Double":
                        _pstmt.setDouble(positions[i], (double)values[i]);
                        break;
                    default:
                        break;
                }
            }
            _pstmt.addBatch();
            batchCount++;

            if(batchCount % maxBatchAmount == 0) 
            {
                Thread t = new Thread(new Runnable() 
                {

                    @Override
                    public void run() 
                    {
                        try 
                        {
                            executeBatch(_pstmt);
                        }catch(Exception e) 
                        {
                            e.printStackTrace();
                        }
                    }
                });
                t.start();              
            }
        }
        else 
        {
            System.err.print("Error Prepared Statements are both NULL");
        }
    }


    public synchronized void executeAllBatches() 
    {
        try 
        {
            if(isRunning) 
            {
                try 
                {
                    wait(); //wait for finish execution
                }catch(InterruptedException e) 
                {
                    e.printStackTrace();
                }                   
            }
            isRunning = true;

            pstmt1.executeBatch();
            pstmt2.executeBatch();


            isRunning = false;
            notify();
        }catch(Exception e) 
        {

        }
    }


    public synchronized void executeBatch(PreparedStatement _pstmt) throws SQLException 
    {
        System.out.println("["+ this.threadName +"]Adding " + batchCount + " Entrys to DB" );
        if(isRunning) 
        {
            try 
            {
                wait(); //wait for finish execution
            }catch(InterruptedException e) 
            {
                e.printStackTrace();
            }           
        }

        pstmt1Active = !pstmt1Active;
        isRunning = true;
        _pstmt.executeBatch();  //execute Query Batch

        isRunning = false;
        notify();               //notify that Batch is executed
    }

}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-05-23
    • 1970-01-01
    • 1970-01-01
    • 2015-01-04
    • 2011-09-29
    • 1970-01-01
    相关资源
    最近更新 更多