【问题标题】:what is the best way to synchronize container access between multiple threads in real-time application在实时应用程序中同步多个线程之间的容器访问的最佳方法是什么
【发布时间】:2010-01-16 11:13:14
【问题描述】:

我的应用程序中有std::list<Info> infoList,它在两个线程之间共享。这 2 个线程正在访问此列表,如下所示:

线程1:在列表中使用push_back()pop_front()clear()(视情况而定) 线程 2:使用 iterator 遍历列表中的项目并执行一些操作。

线程 2 正在迭代列表,如下所示:

for(std::list<Info>::iterator i = infoList.begin(); i != infoList.end(); ++i)
{
  DoAction(i);
}

代码使用 GCC 4.4.2 编译。

有时 ++i 会导致段错误并使应用程序崩溃。该错误是在 std_list.h 第 143 行的以下行中引起的:

_M_node = _M_node->_M_next;

我想这是一个赛车条件。当线程 2 对其进行迭代时,该列表可能已被线程 1 更改甚至清除。

我使用 Mutex 来同步对该列表的访问,并且在我的初始测试期间一切正常。但是系统只是在压力测试下冻结,使得这个解决方案完全不可接受。这个应用程序是一个实时应用程序,我需要找到一个解决方案,以便两个线程都可以尽可能快地运行,而不会影响应用程序的总吞吐量。

我的问题是: 线程 1 和线程 2 需要尽可能快地执行,因为这是一个实时应用程序。我可以做些什么来防止这个问题并仍然保持应用程序性能?是否有任何无锁算法可用于此类问题?

如果我在线程 2 的迭代中错过了一些新添加的 Info 对象,那没关系,但我能做些什么来防止迭代器变成悬空指针?

谢谢

【问题讨论】:

标签: c++ multithreading concurrency access-synchronization


【解决方案1】:

您的 for() 循环可能会在较长时间内保持锁定,具体取决于它迭代的元素数量。如果它“轮询”队列,不断检查是否有新元素可用,您可能会遇到真正的麻烦。这使得线程拥有互斥锁的时间过长,生产者线程很少有机会闯入并添加元素。并在此过程中消耗大量不必要的 CPU 周期。

您需要一个“有界阻塞队列”。不要自己写,锁的设计不是小菜一碟。很难找到好的示例,其中大部分是 .NET 代码。 This article 看起来很有希望。

【讨论】:

    【解决方案2】:

    通常以这种方式使用 STL 容器是不安全的。您将必须实现特定的方法以使您的代码线程安全。您选择的解决方案取决于您的需求。我可能会通过维护两个列表来解决这个问题,每个线程中一个。并通过lock free queue(在此问题的 cmets 中提到)传达更改。您还可以通过将 Info 对象包装在 boost::shared_ptr 中来限制它们的生命周期,例如

    typedef boost::shared_ptr<Info> InfoReference; 
    typedef std::list<InfoReference> InfoList;
    
    enum CommandValue
    {
        Insert,
        Delete
    }
    
    struct Command
    {
        CommandValue operation;
        InfoReference reference;
    }
    
    typedef LockFreeQueue<Command> CommandQueue;
    
    class Thread1
    {
        Thread1(CommandQueue queue) : m_commands(queue) {}
        void run()
        {
            while (!finished)
            {
                //Process Items and use 
                // deleteInfo() or addInfo()
            };
    
        }
    
        void deleteInfo(InfoReference reference)
        {
            Command command;
            command.operation = Delete;
            command.reference = reference;
            m_commands.produce(command);
        }
    
        void addInfo(InfoReference reference)
        {
            Command command;
            command.operation = Insert;
            command.reference = reference;
            m_commands.produce(command);
        }
    }
    
    private:
        CommandQueue& m_commands;
        InfoList m_infoList;
    }   
    
    class Thread2
    {
        Thread2(CommandQueue queue) : m_commands(queue) {}
    
        void run()
        {
            while(!finished)
            {
                processQueue();
                processList();
            }   
        }
    
        void processQueue()
        {
            Command command;
            while (m_commands.consume(command))
            {
                switch(command.operation)
                {
                    case Insert:
                        m_infoList.push_back(command.reference);
                        break;
                    case Delete:
                        m_infoList.remove(command.reference);
                        break;
                }
            }
        }
    
        void processList()
        {
            // Iterate over m_infoList
        }
    
    private:
        CommandQueue& m_commands;
        InfoList m_infoList;
    }   
    
    
    void main()
    {
    CommandQueue commands;
    
    Thread1 thread1(commands);
    Thread2 thread2(commands);
    
    thread1.start();
    thread2.start();
    
    waitforTermination();
    
    }
    

    这还没有编译。您仍然需要确保对 Info 对象的访问是线程安全的。

    【讨论】:

      【解决方案3】:

      我想知道这个列表的目的是什么,那样回答问题会更容易。

      正如 Hoare 所说,尝试共享数据以在两个线程之间进行通信通常是一个坏主意,而应该通过通信来共享数据:即消息传递。

      如果此列表正在建模队列,例如,您可以简单地使用多种方式之一在两个线程之间进行通信(例如套接字)。消费者/生产者是一个标准且众所周知的问题。

      如果你的物品很贵,那么只在通信过程中传递指针,你将避免复制物品本身。

      一般来说,共享数据非常困难,但不幸的是,这是我们在学校听到的唯一一种编程方式。通常只有通信“通道”的低级实现才应该担心同步问题,您应该学会使用通道进行通信,而不是试图模仿它们。

      【讨论】:

      • +1 - 仅用于 Hoare 报价,仅此而已。多线程编程并不是所有问题的唯一答案。
      【解决方案4】:

      为了防止您的迭代器失效,您必须锁定整个 for 循环。现在我猜第一个线程可能难以更新列表。我会尝试给它一个机会在每次(或每 N 次迭代)上完成它的工作。

      伪代码如下:

      mutex_lock();
      for(...){
        doAction();
        mutex_unlock();
        thread_yield();  // give first thread a chance
        mutex_lock();
        if(iterator_invalidated_flag) // set by first thread
          reset_iterator();
      }
      mutex_unlock();
      

      【讨论】:

      • 我是否需要每次在 reset_iterator() 上将迭代器重置到列表的开头; ?
      • @O. Askari:取决于操作:clear - 当然可以,push_back - 不需要重置。至于pop_front——你可能需要重置(比如当你在列表的开头时),在这种情况下,如果你想避免不必要的重置,你需要在线程之间共享迭代器。
      【解决方案5】:

      您必须决定哪个线程更重要。如果是更新线程,那么它必须通知迭代器线程停止、等待并重新开始。如果它是迭代器线程,它可以简单地锁定列表,直到迭代完成。

      【讨论】:

        【解决方案6】:

        最好的方法是使用内部同步的容器。 TBB 和微软的 concurrent_queue 做到了这一点。 Anthony Williams 在他的博客here

        上也有一个很好的实现

        【讨论】:

          【解决方案7】:

          其他人已经提出了无锁替代方案,所以我会像您使用锁一样回答...

          当您修改列表时,现有迭代器可能会变得无效,因为它们不再指向有效内存(列表在需要增长时会自动重新分配更多内存)。为了防止无效的迭代器,您可以在您的消费者遍历列表时使生产者阻塞在互斥体上,但这对于生产者来说是不必要的等待

          如果您使用队列而不是列表,并且让您的消费者使用同步的queue&lt;Info&gt;::pop_front() 调用而不是可以在您背后失效的迭代器,那么生活会更轻松。如果您的消费者真的需要一次吞下大量的Info,那么请使用condition variable,这将使您的消费者阻塞直到queue.size() &gt;= minimum

          Boost 库具有良好的条件变量可移植实现(甚至适用于旧版本的 Windows),以及 usual threading library stuff

          对于使用(老式)锁定的生产者-消费者队列,请查看 ZThreads 库的 BlockingQueue 模板类。我自己没有使用过 ZThreads,担心缺少最近的更新,并且因为它似乎没有被广泛使用。但是,我将它作为创建自己的线程安全生产者-消费者队列的灵感(在我了解 lock-free 队列和 TBB 之前)。

          无锁队列/堆栈库似乎在 Boost 审查队列中。让我们希望我们在不久的将来看到一个新的 Boost.Lockfree! :)

          如果有兴趣,我可以写一个使用 std::queue 和 Boost 线程锁定的阻塞队列示例。

          编辑

          Rick 的答案引用的博客已经有一个使用 std::queue 和 Boost condvars 的阻塞队列示例。如果您的消费者需要吞噬块,您可以扩展示例如下:

          void wait_for_data(size_t how_many)
              {
                  boost::mutex::scoped_lock lock(the_mutex);
                  while(the_queue.size() < how_many)
                  {
                      the_condition_variable.wait(lock);
                  }
              }
          

          您可能还想调整它以允许超时和取消。

          您提到速度是一个问题。如果你的Infos 是重量级的,你应该考虑通过shared_ptr 传递它们。您也可以尝试将Infos 设置为固定大小并使用memory pool(这可能比堆快得多)。

          【讨论】:

            【解决方案8】:

            正如您提到的,您不在乎您的迭代消费者是否错过了一些新添加的条目,您可以在下面使用 copy-on-write 列表。这允许迭代消费者在列表首次启动时对其一致的快照进行操作,而在其他线程中,对列表的更新会产生新的但一致的副本,而不会干扰任何现存的快照。

            这里的交易是,对列表的每次更新都需要锁定足够长的独占访问权限以复制整个列表。这种技术偏向于拥有许多并发阅读器和不太频繁的更新。

            尝试向容器添加内在锁定首先需要您考虑哪些操作需要在原子组中运行。例如,在尝试弹出第一个元素之前检查列表是否为空需要原子 pop-if-not-empty 操作;否则,列表为空的答案可能会在调用者收到答案并尝试对其采取行动之间发生变化。

            在上面的示例中不清楚迭代必须遵守什么保证。迭代线程必须最终访问列表中的每个元素吗?它可以多次通过吗?当另一个线程运行DoAction() 时,一个线程从列表中删除一个元素是什么意思?我怀疑解决这些问题会导致重大的设计更改。


            您正在使用 C++,并且您提到需要一个带有 pop-if-not-empty 操作的队列。很多年前,我使用ACE Library 的并发原语写了一个two-lock queue,因为Boost thread library 还没有准备好用于生产,而包含这些设施的C++ 标准库的机会是一个遥远的梦想。将它移植到更现代的东西会很容易。

            我的这个队列——称为concurrent::two_lock_queue——只允许通过 RAII 访问队列的头部。这确保了获取锁以读取头部将始终与锁的释放配合。消费者构造一个const_front(对头元素的常量访问)、一个front(对头元素的非常量访问)或一个renewable_front(对头和后继元素的非常量访问)对象来表示独占访问队列头元素的权限。这样的“正面”对象是不能复制的。

            two_lock_queue 类还提供了一个 pop_front() 函数,该函数等待至少一个元素可以被删除,但是,为了保持 std::queuestd::stack 不混合容器突变和值复制,pop_front() 返回 void。

            在伴随文件中,有一个名为 concurrent::unconditional_pop 的类型,它允许通过 RAII 确保队列的头元素将在退出当前范围时弹出。

            伴随文件error.hh 定义了使用函数two_lock_queue::interrupt() 产生的异常,用于解除阻塞等待访问队列头的线程。

            查看代码,如果您需要更多关于如何使用它的说明,请告诉我。

            【讨论】:

            • 列表中的每个元素都需要由 DoAction() 处理,但是另一个线程可能会在执行过程中删除一个元素。这不会造成任何伤害,因为它可以检查 Info 对象的有效性。该列表在循环之后不能被清空,因为它将在其他地方使用。我已经为迭代器使用了一个快照,但我仍然有问题如何做一个原子 pop-if-not-empty。谢谢
            【解决方案9】:

            如果您使用 C++0x,您可以通过这种方式在内部同步列表迭代:

            假设该类有一个名为 objects_ 的模板列表和一个名为 mutex_ 的 boost::mutex

            toAll 方法是列表包装器的成员方法

             void toAll(std::function<void (T*)> lambda)
             {
             boost::mutex::scoped_lock(this->mutex_);
             for(auto it = this->objects_.begin(); it != this->objects_.end(); it++)
             {
                  T* object = it->second;
                  if(object != nullptr)
                  {
                            lambda(object);
                       }
                  }
             }
            

            调用:

            synchronizedList1->toAll(
                  [&](T* object)->void // Or the class that your list holds
                  {
                       for(auto it = this->knownEntities->begin(); it != this->knownEntities->end(); it++)
                       {
                            // Do something
                       }
                  }
             );
            

            【讨论】:

              【解决方案10】:

              您必须使用一些线程库。如果您使用的是 Intel TBB,则可以使用 concurrent_vector 或 concurrent_queue。见this

              【讨论】:

                【解决方案11】:

                如果您想在多线程环境中继续使用std::list,我建议将其包装在一个带有互斥锁的类中,以提供对其的锁定访问。根据具体的使用情况,切换到事件驱动的队列模型可能是有意义的,其中消息在多个工作线程正在使用的队列上传递(提示:生产者-消费者)。

                我会认真考虑Matthieu's thought。使用多线程编程解决的许多问题可以更好地使用线程或进程之间的消息传递来解决。如果您需要高吞吐量并且不绝对要求处理共享相同的内存空间,请考虑使用Message-Passing Interface (MPI) 之类的东西,而不是滚动您自己的多线程解决方案。有很多可用的 C++ 实现 - OpenMPIBoost.MPIMicrosoft MPI 等等。

                【讨论】:

                  【解决方案12】:

                  我认为在这种情况下你根本无法在没有任何同步的情况下逃脱,因为某些操作使你正在使用的迭代器失效。使用列表,这是相当有限的(基本上,如果两个线程都试图同时将迭代器操作到同一个元素),但仍然存在在尝试的同时删除元素的危险追加一个。

                  你有没有机会锁定DoAction(i)?您显然只想在可以逃脱的绝对最短时间内保持锁定,以最大限度地提高性能。从上面的代码中,我认为您需要对循环进行一些分解,以加快双方的操作速度。

                  类似的东西:

                  while (processItems) {
                    Info item;
                    lock(mutex);
                    if (!infoList.empty()) {
                       item = infoList.front();
                       infoList.pop_front();
                    }
                    unlock(mutex);
                    DoAction(item);
                    delayALittle();
                  }
                  

                  插入函数仍然必须是这样的:

                  lock(mutex);
                  infoList.push_back(item);
                  unlock(mutex);
                  

                  除非队列可能很大,否则我很想使用std::vector&lt;Info&gt; 甚至std::vector&lt;boost::shared_ptr&lt;Info&gt; &gt; 之类的东西来最小化 Info 对象的复制(假设这些复制比较昂贵到 boost::shared_ptr。通常,对向量的操作往往比对列表的操作要快一些,尤其是当存储在向量中的对象很小且复制成本低时。

                  【讨论】:

                  • 谢谢,这个解决方案看起来很有希望。该列表不包含很多项目,最多 20 个,每个 Info 项目占用大约 128 个字节。但是,如果我需要在 infoList 中保留已处理的项目而不是删除它们怎么办?
                  猜你喜欢
                  • 2016-12-24
                  • 1970-01-01
                  • 1970-01-01
                  • 2013-02-14
                  • 1970-01-01
                  • 1970-01-01
                  • 2016-12-21
                  • 1970-01-01
                  • 2021-07-06
                  相关资源
                  最近更新 更多