【发布时间】:2012-10-23 12:08:30
【问题描述】:
我浏览了几十篇文章,但找不到解决方案。
这是逻辑:
我有一个 Winform (VS2010) 应用程序,它需要从 SQL Server 2008 R2 Express 表 A 读取数据,处理一些计算并存储在另一个表 B 中。
我想使用并行ForEach来缩短执行时间(否则计算+SQL过程需要几天......)
我必须从 SQL 中读取,因为数据库有超过 500 万行,每次读取返回数百行。
列表定义为:
BindingList<ItemsClass> etqM = new BindingList<ItemsClass>();
BindingList<ItemsClass> etqC = new BindingList<ItemsClass>();
并行执行:
Parallel.ForEach(etqC, cv => {
readData(ref etqM, "tableA", "WHERE ID LIKE '" + cv.Name + "%'");
IList<ItemsClass> eResults = etqM.OrderBy(f => f.ID).ToList();
foreach (ItemsClass R in eResults)
{
//calculations comes here
etqM[rID] = R;
}
Parallel.ForEach(etqM, r => {
// part 2 of calculations comes here
}
});
exportList(etqM, "tableB", true);
});
SQL 读取函数: 该函数获取 SQL 的列表、表名 + 条件 从 SQL 中读取记录,并将它们转换为 List 格式。
public void readData<T>(ref BindingList<T> etqList, string tableName, string conditions)
{
SqlConnection myConnection = new SqlConnection();
SqlCommand myCommand = new SqlCommand();
myCommand.CommandTimeout = 0;
myCommand.Connection = myConnection;
etqList.Clear();
openConn(myConnection);
SqlDataReader myReader = null;
try
{
int totalResults;
myCommand.CommandText = "SELECT COUNT (*) FROM " + tableName + " " + conditions;
totalResults = (int)myCommand.ExecuteScalar();
if (totalResults > 0)
{
myCommand.CommandText = "SELECT * FROM " + tableName + " " + conditions;
myReader = myCommand.ExecuteReader();
etqList = ConvertTo<T>(convertReaderToDT(myReader));
}
}
catch (Exception ex) { }
finally
{
try { myReader.Close(); }
catch { }
}
closeConn(myConnection);
}
SQL 导出函数:该函数将给定的列表导出到表名。
private void exportListToSql<T>(IEnumerable<T> etqList, string tableName)
{
SqlConnection myConnection = new SqlConnection();
SqlCommand myCommand = new SqlCommand();
myCommand.CommandTimeout = 0;
myCommand.Connection = myConnection;
openConn(myConnection);
try
{
actionTotalCount++;
DataTable dt = new DataTable(tableName);
dt = ToDataTable(etqList);//List Name
var bulkCopy = new SqlBulkCopy(myConnection,
SqlBulkCopyOptions.TableLock |
SqlBulkCopyOptions.FireTriggers |
SqlBulkCopyOptions.UseInternalTransaction,
null
);
bulkCopy.DestinationTableName = tableName;
bulkCopy.BatchSize = BATCH_SIZE;
bulkCopy.WriteToServer(dt);
}
catch (Exception ex) { }
finally { closeConn(myConnection); }
}
SQL openConn 和 closeConn :
void openConn(SqlConnection myConnection)
{
if (myConnection.State == ConnectionState.Open) return;
myConnection.ConnectionString = "Data Source=" + DB_NAME + ";Initial Catalog=APPDB;Integrated Security=True;Connect Timeout=120;Asynchronous Processing=true;";
try { myConnection.Open(); actionTotalCount++; }
catch (System.Exception ex) { MessageBox.Show(ex.Message); }
}
void closeConn(SqlConnection myConnection)
{
if (myConnection.State == ConnectionState.Fetching || myConnection.State == ConnectionState.Executing || myConnection.State == ConnectionState.Connecting) return;
try { myConnection.Dispose(); }
catch (System.Exception ex) { MessageBox.Show(ex.Message); }
}
问题是:一旦我执行,我就会收到这条消息:
ExecuteScalar 需要一个开放且可用的连接。连接的当前状态为关闭。
此消息到达所有线程,第一个线程除外。
有什么想法吗?
【问题讨论】:
-
Offtopic:在
readData你应该关闭finally{}块中的连接 -
尝试创建一次连接并作为
readData()方法的输入参数在所有工作人员之间共享 -
检查
openConn中的连接状态的目的是什么?openConn似乎在创建new SqlConnection后立即被调用。怎么可能已经打开了? -
我曾尝试创建一次连接,但随后 - 在“Parallel.ForEach”方法中失败并显示以下错误消息:“已经有一个打开的 DataReader 与此命令关联,必须关闭第一个'。
-
@MikeTwo - 是的,我在以前的版本中添加了它,因为我似乎试图打开以前打开的连接。在这个版本中没有必要。
标签: c# sql-server multithreading parallel-processing