【问题标题】:Concurrency issue with std::map insert/erasestd::map 插入/擦除的并发问题
【发布时间】:2012-04-26 15:21:21
【问题描述】:

我正在编写一个线程应用程序,它将处理资源列表,并且可能会或可能不会将结果项放置在每个资源的容器 (std::map) 中。 资源的处理发生在多个线程中。

结果容器将被遍历,每个项目由一个单独的线程处理,该线程获取一个项目并更新 MySQL 数据库(使用 mysqlcppconn API),然后从容器中删除该项目并继续。

为简单起见,这里是逻辑的概述:

queueWorker() - thread
    getResourcesList() - seeds the global queue

databaseWorker() - thread
    commitProcessedResources() - commits results to a database every n seconds

processResources() - thread x <# of processor cores>
    processResource()
    queueResultItem()

以及显示我在做什么的伪实现。

/* not the actual stucts, but just for simplicities sake */
struct queue_item_t {
    int id;
    string hash;
    string text;
};

struct result_item_t {
    string hash; // hexadecimal sha1 digest
    int state;
}

std::map< string, queue_item_t > queue;
std::map< string, result_item_t > results;

bool processResource (queue_item_t *item)
{
    result_item_t result;

    if (some_stuff_that_doesnt_apply_to_all_resources)
    {
        result.hash = item->hash;
        result.state = 1;

        /* PROBLEM IS HERE */
        queueResultItem(result);
    }
}

void commitProcessedResources ()
{
    pthread_mutex_lock(&resultQueueMutex);

    // this can take a while since there

    for (std::map< string, result_item_t >::iterator it = results.begin; it != results.end();)
    {
        // do mysql stuff that takes a while

        results.erase(it++);
    }

    pthread_mutex_unlock(&resultQueueMutex);
}

void queueResultItem (result_item_t result)
{
    pthread_mutex_lock(&resultQueueMutex);

    results.insert(make_pair(result.hash, result));

    pthread_mutex_unlock(&resultQueueMutex);
}

正如 processResource() 中所指出的,问题就在那里,并且当 commitProcessedResources() 正在运行并且 resultQueueMutex 被锁定时,我们将在这里等待 queueResultItem() 返回,因为它会尝试锁定相同的互斥体并且因此将等到它完成,这可能需要一段时间。

显然,由于运行的线程数量有限,因此一旦所有线程都在等待 queueResultItem() 完成,在互斥锁被释放并可用于 queueResultItem() 之前,将不再进行任何工作。

那么,我的问题是如何最好地实现这一点?是否有一种特定类型的标准容器可以同时插入和删除,或者是否存在我不知道的东西?

严格不必像 std::map 那样每个队列项都可以拥有自己的唯一键,但我更喜欢它,因为多个资源可以产生相同的结果我宁愿只向数据库发送一个唯一的结果,即使它确实使用 INSERT IGNORE 来忽略任何重复。

我对 C++ 还很陌生,所以很遗憾,我不知道在 Google 上要寻找什么。 :(

【问题讨论】:

    标签: c++ concurrency pthreads


    【解决方案1】:

    commitProcessedResources () 的处理过程中,您不必一直为队列持有锁。您可以改为将队列与空队列交换:

    void commitProcessedResources ()
    {
        std::map< string, result_item_t > queue2;
        pthread_mutex_lock(&resultQueueMutex);
        // XXX Do a quick swap.
        queue2.swap (results);
        pthread_mutex_unlock(&resultQueueMutex);
    
        // this can take a while since there
    
        for (std::map< string, result_item_t >::iterator it = queue2.begin();
            it != queue2.end();)
        {
            // do mysql stuff that takes a while
    
            // XXX You do not need this.
            //results.erase(it++);
        }   
    }
    

    【讨论】:

    • 这似乎工作得很好。我之前尝试分配: queue2 = results;当我在某处读到会复制“结果”时,但这没有用。
    • @ThomasDaugaard: queue2=results 会将results 复制到queue2,但不会删除results。就像我展示的那样,将空队列与完整队列交换更容易和更快。
    • 是的,我知道我需要 .clear() 原件,但正如你所说,这要容易得多。
    【解决方案2】:

    您需要使用同步方法(即互斥锁)才能使其正常工作。但是,并行编程的目标是最小化临界区(即在您持有锁时执行的代码量)。

    也就是说,如果您的 MySQL 查询可以在不同步的情况下并行运行(即多个调用不会相互冲突),请将它们从临界区中取出。这将大大减少开销。例如,如下所示的简单重构就可以解决问题

    void commitProcessedResources ()
    {
        // MOVING THIS LOCK
    
        // this can take a while since there
        pthread_mutex_lock(&resultQueueMutex);
        std::map<string, result_item_t>::iterator end = results.end();
        std::map<string, result_item_t>::iterator begin = results.begin();
        pthread_mutex_unlock(&resultQueueMutex);
    
        for (std::map< string, result_item_t >::iterator it = begin; it != end;)
        {
            // do mysql stuff that takes a while
    
            pthread_mutex_lock(&resultQueueMutex); // Is this the only place we need it?
            // This is a MUCH smaller critical section
            results.erase(it++);
            pthread_mutex_unlock(&resultQueueMutex); // Unlock or everything will block until end of loop
        }
    
        // MOVED UNLOCK
    }
    

    这将使您能够跨多个线程同时“实时”访问数据。也就是说,随着每次写入完成,地图都会更新,并且可以使用当前信息在其他地方读取。

    【讨论】:

    • 我唯一的问题是,如果 results.end() 将在每次迭代中重新评估,并且随着 queueResultItem() 添加项目,是否会延长循环直到它完全 空的?
    • 我相信这取决于编译器如何优化您的代码。无论如何,请查看我更新的解决方案,以了解这一点。
    • @RageD:没有锁你仍然无法执行results.begin()
    【解决方案3】:

    在 C++03 之前,该标准根本没有定义任何关于线程或线程安全的内容(而且由于您使用的是 pthreads,我猜这几乎就是您正在使用的)。

    因此,您可以对共享地图进行锁定,以确保在任何给定时间只有一个线程尝试访问该地图。否则,您可能会破坏其内部数据结构,因此该地图根本不再有效。

    或者(我通常更喜欢这个)你可以让你的多个线程将它们的数据放入一个线程安全的队列中,并让一个线程从该队列中获取数据并将其放入映射中。由于它是单线程的,因此您不再需要在使用地图时锁定地图。

    当您将地图刷新到磁盘时,有一些合理的可能性来处理延迟。可能最简单的方法是从队列中读取相同的线程,插入映射,并定期将映射刷新到磁盘。在这种情况下,当映射被刷新到磁盘时,传入的数据只是位于队列中。这使得对地图的访问变得简单——因为只有一个线程直接接触它,它可以在没有任何锁定的情况下使用地图。

    另一个是有两张地图。在任何给定时间,刷新到磁盘的线程都会获得一个映射,而从队列中检索并插入映射的线程会获得另一个。当刷新线程需要做它的事情时,它只是交换两者的角色。就个人而言,我认为我更喜欢第一种——消除地图周围的所有锁定具有很大的吸引力,至少对我来说是这样。

    另一个保持这种简单性的变体是队列->映射线程创建映射,填充它,当它足够满时(即,在适当的时间长度之后)将它填充到另一个队列中,然后重复从一开始(即,创建新地图等) 刷新线程从其传入队列中检索地图,将其刷新到磁盘,然后将其销毁。尽管这会增加一些创建和销毁地图的开销,但您这样做的频率并不足以引起很多关注。您仍然可以随时保持对任何地图的单线程访问,并且仍然保持所有数据库访问与其他一切隔离。

    【讨论】:

      猜你喜欢
      • 2018-12-05
      • 2011-10-24
      • 2021-05-08
      • 2011-10-23
      • 1970-01-01
      • 2023-01-13
      • 2010-09-10
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多