【问题标题】:Multi-threading on a foreach loop?foreach循环上的多线程?
【发布时间】:2010-07-06 13:40:00
【问题描述】:

我想处理一些数据。我在字典中有大约 25k 项。在 foreach 循环中,我查询数据库以获取有关该项目的结果。它们作为值添加到字典中。

foreach (KeyValuePair<string, Type> pair in allPeople)
{
    MySqlCommand comd = new MySqlCommand("SELECT * FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con);
    MySqlDataReader reader2 = comd.ExecuteReader();
    Dictionary<string, Dictionary<int, Log>> allViews = new Dictionary<string, Dictionary<int, Log>>();
    while (reader2.Read())
    {
        if (!allViews.ContainsKey(reader2.GetString("src")))
        {
            allViews.Add(reader2.GetString("src"), reader2.GetInt32("time"));
        }
    }
    reader2.Close();
    reader2.Dispose();
    allPeople[pair.Key].View = allViews;
}

我希望能够通过多线程更快地做到这一点。我有 8 个线程可用,CPU 使用率约为 13%。我只是不知道它是否会工作,因为它依赖于 MySQL 服务器。另一方面,也许 8 个线程会打开 8 个数据库连接,因此会更快。

无论如何,如果多线程在我的情况下会有所帮助,如何? o.O 我从来没有使用过(多个)线程,所以任何帮助都会很棒:D

【问题讨论】:

  • 您是否对此进行了分析 - 什么需要时间,多长时间?例如你是把大部分时间花在数据库上,还是在字典上的实际包含/添加上
  • 执行此操作需要 18 分钟,频率为 2.9GHz。问题是,25k 只是部分复制数据库中的测试。真正的东西还有很多。
  • 我建议查看 .NET 4.0 框架中添加的新并行计算类。 msdn.microsoft.com/en-gb/concurrency/bb895950.aspx.
  • @btlog:虽然这是首先想到的,但在这里不合适。

标签: multithreading c#-4.0 foreach


【解决方案1】:

MySqlDataReader 是有状态的——你在它上面调用Read(),它会移动到下一行,所以每个线程都需要自己的阅读器,你需要编写一个查询,以便它们获得不同的值。这可能并不太难,因为您自然会有许多具有不同 pair.Key 值的查询。

您还需要每个线程都有一个临时字典,然后将它们合并,或者使用锁来防止字典的并发修改。

以上假设 MySQL 将允许单个连接执行并发查询;否则您可能还需要多个连接。

首先,如果您只向数据库询问您需要的数据 ("SELECT src,time FROMlogsWHERE IP = '" + pair.Key + "' GROUP BY src") 并使用 GetString(0) 和 GetInt32(1) 而不是使用名称来查找,我会看看会发生什么增加src和时间;也只能从结果中获取一次值。

我也不确定逻辑 - 您没有按时间排序日志事件,所以哪个是第一个返回的(因此存储在字典中)可能是其中任何一个。

类似这样的逻辑 - 每个 N 线程只在第 N 对上运行,每个线程都有自己的阅读器,实际上没有任何变化allPeople,只有allPeople中值的属性:

    private void RunSubQuery(Dictionary<string, Type> allPeople, MySqlConnection con, int threadNumber, int threadCount)
    {
        int hoppity = 0; // used to hop over the keys not processed by this thread

        foreach (var pair in allPeople)
        {
            // each of the (threadCount) threads only processes the (threadCount)th key
            if ((hoppity % threadCount) == threadNumber)
            {
                // you may need con per thread, or it might be that you can share con; I don't know
                MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", con);

                using (MySqlDataReader reader = comd.ExecuteReader())
                {
                    var allViews = new Dictionary<string, Dictionary<int, Log>>();

                    while (reader.Read())
                    {
                        string src = reader.GetString(0);
                        int time = reader.GetInt32(1);

                        // do whatever to allViews with src and time
                    }

                    // no thread will be modifying the same pair.Value, so this is safe
                    pair.Value.View = allViews;
                }
            }

            ++hoppity;
        }
    }

这未经测试 - 我在这台机器上没有 MySQL,也没有您的数据库和您正在使用的其他类型。它也相当程序化(有点像在 Fortran 中使用 OpenMPI 的方式),而不是将所有内容都包装在任务对象中。

您可以像这样为此启动线程:

    void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
    {
        lock (allPeople)
        {
            const int threadCount = 8; // the number of threads

            // if it takes 18 seconds currently and you're not at .net 4 yet, then you may as well create
            // the threads here as any saving of using a pool will not matter against 18 seconds
            //
            // it could be more efficient to use a pool so that each thread takes a pair off of 
            // a queue, as doing it this way means that each thread has the same number of pairs to process,
            // and some pairs might take longer than others
            Thread[] threads = new Thread[threadCount];

            for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
            {
                threads[threadNumber] = new Thread(new ThreadStart(() => RunSubQuery(allPeople, connection, threadNumber, threadCount)));
                threads[threadNumber].Start();
            }

            // wait for all threads to finish
            for (int threadNumber = 0; threadNumber < threadCount; ++threadNumber)
            {
                threads[threadNumber].Join();
            }
        }
    }

allPeople 上持有的额外锁已完成,以便在所有线程返回后存在写屏障;我不太确定是否需要它。任何物体都可以。

这并不能保证任何性能提升——可能是 MySQL 库是单线程的,但服务器当然可以处理多个连接。使用不同数量的线程进行测量。


如果您使用的是 .net 4,那么您不必费力地创建线程或跳过您不处理的项目:

    // this time using .net 4 parallel; assumes that connection is thread safe
    static void RunQuery(Dictionary<string, Type> allPeople, MySqlConnection connection)
    {
        Parallel.ForEach(allPeople, pair => RunPairQuery(pair, connection));
    }

    private static void RunPairQuery(KeyValuePair<string, Type> pair, MySqlConnection connection)
    {
        MySqlCommand comd = new MySqlCommand("SELECT src,time FROM `logs` WHERE IP = '" + pair.Key + "' GROUP BY src", connection);

        using (MySqlDataReader reader = comd.ExecuteReader())
        {
            var allViews = new Dictionary<string, Dictionary<int, Log>>();

            while (reader.Read())
            {
                string src = reader.GetString(0);
                int time = reader.GetInt32(1);

                // do whatever to allViews with src and time
            }

            // no iteration will be modifying the same pair.Value, so this is safe
            pair.Value.View = allViews;
        }
    }

【讨论】:

  • 请记住,分割 sql 语句可能会或可能不会增加额外的性能下降 - 需要进行测试。
  • 我目前正在测试一个只包含我需要的字段的查询。
  • 好吧,当我只查询我需要的字段时,它仍然需要 18 分钟。它只快了几毫秒。我会试试你的代码,我有一个问题虽然 xD 我不知道如何“//在每个线程中运行它,连续的 threadNumber 值;”你能解释一下吗?就像我说的那样,我对多线程完全陌生。
【解决方案2】:

想到的最大问题是您将使用多线程向字典添加值,这不是线程安全的。

您必须做一些事情 like this 才能使其工作,并且您可能不会从实现它中获得太多好处,因为它仍然必须锁定字典对象才能添加值。

【讨论】:

  • 嗯,你看到的字典只是临时的。主字典中的每个项目,在此循环中使用临时字典更改值。不会删除或添加主词典中的条目。只是变了。我认为使它成为多线程会让 8 个线程处理字典的前 8 个线程,当一个线程完成时,它需要条目 9 等
  • allViews 字典是这里的问题。看,如果有两个线程试图同时写入所有视图而没有会产生问题的锁。虽然您可以使用锁来解决这个问题,但我不确定您会从中获得多少收益。也就是说,我必须同意安德烈关于读者的说法——读者2才是真正的罪魁祸首。大多数其他事情都有解决方法,但不是 reader2。
  • 多线程工作,但你告诉它工作。通常,所有事情都同时发生,这可能会导致其自身的一系列问题。没有安全措施,就像没有红绿灯的十字路口。
  • 另外,在多个线程中从同一个 MySqlDataReader 读取是不安全的。
  • 取决于您是在谈论 allView 还是 allPeople 字典,以及如何编写此代码以利用并行性。我假设 foreach 中的所有代码都是处理单元,所以 allView 只是一个线程,而 allPeople 正在更新一个值的属性。根据值是否唯一确定更新属性是否是线程安全的。查看代码 sn-p 似乎是合理的假设是这种情况。
【解决方案3】:

假设:

  1. 您的中有一张表格 People 数据库
  2. 有很多人在 您的数据库

每个数据库查询都会增加您为数据库中的每个人执行一个数据库查询的开销我建议在一个查询中获取所有数据然后进行重复调用会更快

select l.ip,l.time,l.src 
  from logs l, people p 
  where l.ip = p.ip
  group by l.ip, l.src

在单个线程中尝试循环,我相信这会比您现有的代码快得多。

在您现有的代码中,您可以做的另一件事是将 MySqlCommand 的创建从循环中取出,提前准备好并更改参数。这应该会加快 SQL 的执行速度。见http://dev.mysql.com/doc/refman/5.0/es/connector-net-examples-mysqlcommand.html#connector-net-examples-mysqlcommand-prepare

MySqlCommand comd = new MySqlCommand("SELECT * FROM `logs` WHERE IP = ?key GROUP BY src", con);
comd.prepare();
comd.Parameters.Add("?key","example");
foreach (KeyValuePair<string, Type> pair in allPeople)
{
    comd.Parameters[0].Value = pair.Key;

如果你使用多线程,每个线程仍然需要有自己的命令,至少在 MS-SQL 中,即使你每次都重新创建和准备语句,这仍然会更快,因为 SQL 服务器能够能够缓存参数化语句的执行计划。

【讨论】:

  • 不,有一张表,其中包含 IP | 列时间 | src 我实际上正在尝试制作一些可以分析这些日志并在其中找到垃圾邮件/模式的东西。没什么太严重的,我只是想知道如何更快地加载它:P
  • @lordstyx 在您的帖子中 allPeople 变量来自哪里以及其中有多少元素?
  • allPeople 是一个以 IP 为键的全局字典,并在此函数中分配值。 IP 在 diff 函数中获取。
  • 有趣。而不是在每个循环中创建一个新命令,而只是更改参数将时间再缩短 30 秒 o.O @Pete Kirkham:当我使用该 Parallel.ForEach 方法时,我收到一堆关于该连接处的阅读器必须关闭的错误第一的。我可以尝试每次都建立联系,但我不知道这会如何影响时间。
【解决方案4】:

在您做任何其他事情之前,请找出确切的时间花在哪里。检查查询的执行计划。我首先怀疑是 logs.IP 上缺少索引。

18 分钟这样的事情对我来说似乎太长了。即使您可以通过添加更多线程(这不太可能!)将执行时间缩短为 8 分钟,但最终使用的时间仍然超过 2 分钟。您可能可以在不到 5 秒的时间内将整个 25k 行读入内存并在内存中进行必要的处理...

编辑:澄清一下,我并不是提倡在内存中实际执行此操作,只是说看起来这里有一个更大的瓶颈可以消除。

【讨论】:

    【解决方案5】:

    我认为,如果您在多核机器上运行它,您可以从多线程中获益。

    但是,我的方法是首先查看通过进行异步数据库调用来解除阻塞当前正在使用的线程。回调将在后台线程上执行,因此您将在那里获得一些多核优势,并且您不会阻塞等待数据库返回的线程。

    对于像这个示例这样的 IO 密集型应用程序,您可能会看到吞吐量有所提高,具体取决于数据库可以处理的负载。假设数据库可以处理多个并发请求,您应该会很好。

    【讨论】:

      【解决方案6】:

      感谢大家的帮助。目前我正在使用这个

      for (int i = 0; i < 8; i++)
      {
          ThreadPool.QueueUserWorkItem(addDistinctScres, i);
      }
      

      ThreadPool 运行所有线程。我使用 Pete Kirkham 提供的方法,并且我正在为每个线程创建一个新连接。 时间缩短到 4 分钟。

      接下来我做点什么等待线程池的回调?在执行其他功能之前。

      我认为现在的瓶颈是 MySQL 服务器,因为 CPU 使用率下降了。

      @odd parity 我想过这个问题,但实际情况是超过 25k 行。我想知道这是否可行。

      【讨论】:

      • 呵呵,我并不是说您应该实际上将整个表加载到内存中,只是如果您将“理想”方法与当前方法所需的时间进行比较花费的时间很明显,某处存在尚未解决的瓶颈。我要说的是,与优化当前代码相比,花时间寻找瓶颈可能会更好。
      • 哦,好吧。在消除单线程瓶颈(现在使用 8 :D)之后,我认为新的瓶颈是数据库,因为 CPU 使用率在此过程中有峰值下降。我不知道还有什么可能。
      【解决方案7】:

      这听起来像是 map/reduce 的完美工作,我不是 .Net 程序员,但这似乎是一个合理的指南: http://ox.no/posts/minimalistic-mapreduce-in-net-4-0-with-the-new-task-parallel-library-tpl

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2021-06-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-11-18
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多