【发布时间】:2018-03-05 18:34:19
【问题描述】:
我现在用谷歌搜索了一段时间,但我无法让它工作。我的方法是拥有一个多线程软件,它在一个线程上收集数据,然后在第一个线程继续收集数据时在第二个线程中执行 Mysql 的批处理查询。目标应该是为数百万次插入使用尽可能少的 RAM。在我的数据收集过程中,我使用 MYSQLInsertThread 的一个对象,并像这样插入数据:
String[] types = {"Long", "Long"};
int[] pos = {1, 1};
Object[] values = {123, 456};
nodeTagsInsertThread.addBatch(types, pos, values);
第一次调用有效,如果我将 batchCount 设置为 100,我的数据库中有 104 个条目,但这是我的班级产生的唯一条目(它应该导入 500 万个条目!
public class MYSQLInsertThread implements Runnable
{
private Thread t;
private String threadName = "MYSQL InsertThread";
private String query;
private PreparedStatement pstmt;
private PreparedStatement pstmt2;
private long batchCount;
private long maxBatchAmount;
private Boolean pstmt1Active = true;
private Boolean isRunning = false;
public MYSQLInsertThread(String name, String query, Connection conn, int maxBatchAmount) throws SQLException
{
threadName = name;
this.pstmt = conn.prepareStatement(query);
this.pstmt2 = conn.prepareStatement(query);
this.query = query;
this.maxBatchCount = maxBatchAmount;
System.out.println("Creating Thread: " + name);
}
public synchronized void addBatch(String[] types, int[] positions, Object[] values) throws SQLException
{
PreparedStatement _pstmt;
if(pstmt1Active)
{
_pstmt = pstmt;
}
else
{
_pstmt = pstmt2;
}
if(_pstmt != null)
{
for(int i=0;i<types.length; i++)
{
switch(types[i])
{
case "string":
case "String":
_pstmt.setString(positions[i], (String)values[i]);
break;
case "long":
case "Long":
_pstmt.setLong(positions[i], (long)values[i]);
break;
case "int":
case "Integer":
_pstmt.setInt(positions[i], (int)values[i]);
break;
case "double":
case "Double":
_pstmt.setDouble(positions[i], (double)values[i]);
break;
default:
break;
}
}
_pstmt.addBatch();
batchCount++;
if(batchCount % maxBatchCount == 0)
{
System.out.println("["+ this.threadName +"]Adding " + batchCount + " Entrys to DB" );
this.executeBatch();
}
}
else
{
System.err.println("[MYSQLInsertThread]Error PreparedStatment is NULL, Parameter could not be added");
}
}
public synchronized void executeBatch()
{
PreparedStatement _pstmt;
if(pstmt1Active)
{
_pstmt = pstmt;
}
else
{
_pstmt = pstmt2;
}
if(_pstmt != null)
{
if(isRunning)System.out.println("Waiting for previous Batch Execution to finish");
while(isRunning)
{
System.out.print(".");
try {
Thread.sleep(100);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
this.start();
System.out.println("Execution Started Successfully");
}
else
{
System.err.println("[" + this.threadName + "]PSTMT is NULL");
}
}
@Override
public void run()
{
PreparedStatement _pstmt;
if(pstmt1Active)
{
_pstmt = pstmt;
}
else
{
_pstmt = pstmt2;
}
if(_pstmt != null)
{
isRunning = true;
pstmt1Active = !pstmt1Active;
try
{
_pstmt.executeBatch();
}
catch(Exception e)
{
e.printStackTrace();
}
isRunning = false;
}
}
public void start()
{
if(t == null)
{
t = new Thread(this, threadName);
t.start();
}
}
}
【问题讨论】:
-
我建议您不要像这样交换两个
PreparedStatement实例,而是获得一个新的PreparedStatement。或者只使用一个并在其上进行同步。 -
在 PreparedStatement 上不应该有能力,如果准备好的语句正在执行批处理,我不确定在执行期间添加新批处理是否安全,如果我创建一个新的 PreparedStatement,我会覆盖正在运行的执行,我想这应该会杀死正在运行的执行
标签: java mysql multithreading