MySqlDataReader 是有状态的——你在它上面调用Read(),它会移动到下一行,所以每个线程都需要自己的阅读器,你需要编写一个查询,以便它们获得不同的值。这可能并不太难,因为您自然会有许多具有不同 pair.Key 值的查询。
您还需要每个线程都有一个临时字典,然后将它们合并,或者使用锁来防止字典的并发修改。
以上假设 MySQL 将允许单个连接执行并发查询;否则您可能还需要多个连接。
首先,如果您只向数据库询问您需要的数据 ("SELECT src,time FROMlogsWHERE IP = '" + pair.Key + "' GROUP BY src") 并使用 GetString(0) 和 GetInt32(1) 而不是使用名称来查找,我会看看会发生什么增加src和时间;也只能从结果中获取一次值。
我也不确定逻辑 - 您没有按时间排序日志事件,所以哪个是第一个返回的(因此存储在字典中)可能是其中任何一个。
类似这样的逻辑 - 每个 N 线程只在第 N 对上运行,每个线程都有自己的阅读器,实际上没有任何变化allPeople,只有allPeople中值的属性:
private void RunSubQuery(Dictionary<string, Type> allPeople, MySqlConnection con, int threadNumber, int threadCount)
{
int hoppity = 0; // used to hop over the keys not processed by this thread
foreach (var pair in allPeople)
{
// each of the (threadCount) threads only processes the (threadCount)th key
if ((hoppity % threadCount) == threadNumber)
{
// you may need con per thread, or it might be that you can share con; I don't know
MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con);
using (MySqlDataReader reader = comd.ExecuteReader())
{
var allViews = new Dictionary<string, Dictionary<int, Log>>();
while (reader.Read())
{
string src = reader.GetString(0);
int time = reader.GetInt32(1);
// do whatever to allViews with src and time
}
// no thread will be modifying the same pair.Value, so this is safe
pair.Value.View = allViews;
}
}
++hoppity;
}
}
这未经测试 - 我在这台机器上没有 MySQL,也没有您的数据库和您正在使用的其他类型。它也相当程序化(有点像在 Fortran 中使用 OpenMPI 的方式),而不是将所有内容都包装在任务对象中。
您可以像这样为此启动线程:
void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
{
lock (allPeople)
{
const int threadCount = 8; // the number of threads
// if it takes 18 seconds currently and you're not at .net 4 yet, then you may as well create
// the threads here as any saving of using a pool will not matter against 18 seconds
//
// it could be more efficient to use a pool so that each thread takes a pair off of
// a queue, as doing it this way means that each thread has the same number of pairs to process,
// and some pairs might take longer than others
Thread[] threads = new Thread[threadCount];
for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
{
threads[threadNumber] = new Thread(new ThreadStart(() => RunSubQuery(allPeople, connection, threadNumber, threadCount)));
threads[threadNumber].Start();
}
// wait for all threads to finish
for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
{
threads[threadNumber].Join();
}
}
}
allPeople 上持有的额外锁已完成,以便在所有线程返回后存在写屏障;我不太确定是否需要它。任何物体都可以。
这并不能保证任何性能提升——可能是 MySQL 库是单线程的,但服务器当然可以处理多个连接。使用不同数量的线程进行测量。
如果您使用的是 .net 4,那么您不必费力地创建线程或跳过您不处理的项目:
// this time using .net 4 parallel; assumes that connection is thread safe
static void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
{
Parallel.ForEach(allPeople, pair => RunPairQuery(pair, connection));
}
private static void RunPairQuery(KeyValuePair<string, Type> pair, MySqlConnection connection)
{
MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", connection);
using (MySqlDataReader reader = comd.ExecuteReader())
{
var allViews = new Dictionary<string, Dictionary<int, Log>>();
while (reader.Read())
{
string src = reader.GetString(0);
int time = reader.GetInt32(1);
// do whatever to allViews with src and time
}
// no iteration will be modifying the same pair.Value, so this is safe
pair.Value.View = allViews;
}
}