【问题标题】:C++ Concurrent Queue: Slower with > 1 threadsC ++并发队列:> 1个线程较慢
【发布时间】:2017-03-10 03:23:33
【问题描述】:

队列与两个不同的锁并发:一个用于 enqueue() 以保护同时入队的多个线程,另一个用于 dequeue() 以达到相同的效果。

Add(enqueue) 只是在队列已满时跳过(返回)插入。 如果队列为空,Remove(dequeue) 会跳过删除。

我使用 doRandom() 生成 0 到 1 之间的随机数。我使用这些数字来决定是否添加/删除。

性能:我尝试使用静态/动态线程分配测试队列。对于两种分配算法,>1 个线程的执行时间要慢得多。

   //g++ -std=c++0x -pthread -o block blocking.cpp;./block

    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <atomic>
    #include <random>
    #include <stdio.h>  
    #include <time.h>
    #include <stdlib.h>
    #include <ctime>
    #include <condition_variable>
    using namespace std;

    #define NUMBER_OF_THREADS 1
    #define NUMBER_OF_OPERATIONS 10000000
    #define QUEUE_CAPACITY 1000000

    std::vector<double> getRandom();


    template <class T> class BoundedQueue {

        private:
            T * array;  
            int head, tail, capacity;
            std::mutex enqLock, deqLock;
            std::atomic<long> sharedCounter;
            std::condition_variable notEmptyCondition, notFullCondition;    

        public:
            void staticAllocation(double randomNumbers[], int threadID);
            void dynamicAllocation(double randomNumbers[]);
            void add (T x);
            BoundedQueue ();
            void remove ();
    };

    template <class T> BoundedQueue<T>::BoundedQueue () {
        capacity = QUEUE_CAPACITY;
        array = new T[capacity];
        head = 0;
        tail = 0;
        sharedCounter = 0;
    }

    template <class T> void BoundedQueue<T>::add (T x) {

            enqLock.lock();

            if (tail - head == capacity) {
                enqLock.unlock();
                return;
            }
            array[tail % capacity] = x;
            tail++;

            enqLock.unlock();
    }   

    template <class T> void BoundedQueue<T>::remove() {

            deqLock.lock();
            if (tail - head == 0) {
                deqLock.unlock();
                return;
            }
            T result = array [head % capacity];
            head++;

            deqLock.unlock();
    }   

    template <class T> void BoundedQueue<T>::dynamicAllocation(double randomNumbers[]) {

        long i = 0;
        while (i < QUEUE_CAPACITY) {
            i = sharedCounter.fetch_add(1, std::memory_order_relaxed);
            if(randomNumbers[i] <= 0.5) add(0);
            else remove();
        }   
    }

    template <class T> void BoundedQueue<T>::staticAllocation (double randomNumbers[], int threadID) {

        int split = NUMBER_OF_OPERATIONS / NUMBER_OF_THREADS;   
        for (int i = threadID * split; i < (threadID * split) + split; i++) {
            if(randomNumbers[i] <= 0.5) add(0);
            else remove();
        }
    }

    std::vector<double> getRandom() {

        std::vector<double> numbers;
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_real_distribution<> dis(0,1);

        for(int i = 0; i < NUMBER_OF_OPERATIONS; i++) numbers.push_back(dis(gen));

        return numbers;
    }   

    int main () {

        BoundedQueue<int> bQ;
        std::vector<double> temp = getRandom();
        double* randomNumbers = &temp[0];

        std::thread myThreads[NUMBER_OF_THREADS];

        clock_t begin = clock();

        for(int i = 0; i < NUMBER_OF_THREADS; i++) {
                    myThreads[i] = std::thread ( [&] {bQ.dynamicAllocation(randomNumbers); });
        }

        for(int i = 0; i < NUMBER_OF_THREADS; i++) {
            if(myThreads[i].joinable()) myThreads[i].join();
        }   

        clock_t end = clock();

        cout << double(end-begin) * 1000 / CLOCKS_PER_SEC;
        return 0;
    }

【问题讨论】:

    标签: c++ multithreading performance queue fifo


    【解决方案1】:

    这里没有并行性。请注意,您的所有线程几乎都已序列化,因为您在函数的开头获取锁并在最后释放它。

    虽然由于缺乏并行性,工作在线程之间进行了划分,但锁定/解锁的开销占主导地位,并且与单线程相比,总体显示执行时间较长。

    所以有并发,但没有没有并行性,我们只是为同步付出了代价,却没有性能优势。

    【讨论】:

    • 我使用了两种不同的锁:一种用于入队,一种用于出队。所以应该有可能线程 A 正在执行 add() 而线程 B 执行 remove()。
    • 我使用 2 个线程而不是 1 个线程来实现 staticAllocation() 将导致工作在两个线程之间进行拆分。: int split = NUM​​BER_OF_OPERATIONS / NUMBER_OF_THREADS; for (int i = threadID * split; i
    • 如果您使用 1 个线程与 n 个线程,我们是否仍然执行相同数量的锁定/解锁?我不明白您所说的“我们只是为同步而付出代价而没有性能优势。”
    • 多线程高频锁定/解锁意味着:每个人都需要等待持有者释放它。但是在一个线程中,没有等待。一般来说,在多线程中,有一个隐含的等待概念,这是这段代码中的主要内容。因此,当您有多个线程并且您以高频率调用它们并且所有代码都由锁保护时,锁定/解锁变得昂贵(您在函数内所做的工作量非常小)
    • 在 1 个线程中没有停止/等待。但是有超过 1 个线程停止/等待。所以锁定/解锁的开销是不一样的。第一个要快得多。
    猜你喜欢
    • 2017-05-18
    • 1970-01-01
    • 2019-08-15
    • 1970-01-01
    • 1970-01-01
    • 2023-04-09
    • 2010-10-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多