【问题标题】:Implementation of a lock free vector无锁向量的实现
【发布时间】:2021-04-13 07:51:51
【问题描述】:

经过多次搜索,我找不到无锁向量实现。 有一个文件谈到它,但没有具体内容(无论如何我都没有找到它)。 http://pirkelbauer.com/papers/opodis06.pdf

目前有2个线程处理数组,可能会更多。

一个线程更新不同的向量,另一个线程访问向量进行计算等。每个线程每秒访问不同的数组很多次。

我在不同的向量上实现了一个带互斥锁的锁,但是当读取或写入线程解锁时间过长时,所有进一步的更新都会延迟。 然后我想一直复制数组以加快速度,但是每秒复制数千次包含数千个元素的数组对我来说似乎并不好。

所以我想在每个表中为每个值使用 1 个互斥锁来仅锁定我正在处理的值。

无锁可能会更好,但我找不到解决方案,我想知道性能是否真的会更好。

编辑:

我有一个线程接收向量中的数据和范围。 当我实例化结构时,我使用固定大小。

我必须为更新做两件事:

-更新向量元素。 (模拟二维向量的一维向量)

-在向量末尾添加一行并删除第一行。数组始终保持排序。添加元素比更新少得多

只读线程遍历数组并执行计算。 为了限制花在数组上的时间并尽可能少地进行计算,我使用数组来存储我的计算结果。尽管如此,我还是经常需要对表进行足够的扫描以进行新的计算或只是更新它们。 (应用程序是实时的,因此要进行的计算会根据请求而有所不同)

当一个新元素添加到向量中时,读取线程必须直接使用它来更新计算。

当我说计算时,它不一定只是算术,它更多的是一种处理。

【问题讨论】:

  • 你有生产者/消费者模式吗?您可能希望为此使用循环缓冲区。如果不是,您的访问模式到底是什么(“一个线程更新不同的向量”有点含糊。它是如何更新的?更改了索引?通过push_back添加了一个元素?这些与访问的地方不同吗?其他线程?)
  • @Artyer 我更新了我的问题
  • 向量中有多少行?由于您正在向矢量添加和删除行(“模拟二维矢量的一维矢量”)不要模拟二维矢量。相反,对行集进行操作。这样,您可以在不锁定 2D 数组的情况下创建新行,并将其锁定足够长的时间以删除第一行并添加新行,而不是锁定整个 2D 数组。这应该是 fast 因为它们只是指向行的指针。
  • 可能没有适合您的问题的无锁方法。这是因为听起来您需要进行范围比无锁原子可以容纳的范围更广的原子更新——例如,删除一行并添加另一行。还有一个问题是,在阅读器处理向量时执行更新——即使是原子的——是否会导致阅读器误解数据。例如,您可能会得到与整个向量曾经拥有的任何聚合状态都不对应的计算结果。
  • 2020 CppCon 挑战赛视频:youtu.be/Pi243hGxDyA。 Concurrency in Action 一书也讨论了挑战,但我没有看到无锁向量示例。

标签: c++ multithreading performance pthreads atomic


【解决方案1】:

运行并发没有完美的实现,每个任务都有自己的优点。我找到一个体面的实现的 goto 方法是只允许需要的东西,然后检查我将来是否需要更多东西。 您描述了一个非常简单的场景,一个线程对一个共享向量执行一个操作,然后该向量需要判断该操作是否被允许,所以 std::atomic_flag 很好。

这个例子应该让你了解它是如何工作的以及要花费什么。主要是我只是在eatch数组上附加了一个标志并在之前检查它是否可以安全地做某事,有些人喜欢在标志上添加一个警卫,以防万一。

#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>

const int vector_size = 1024;

struct Element {
    void some_yield(){
        std::this_thread::yield();
    };
    void some_wait(){
        std::this_thread::sleep_for(
            std::chrono::microseconds(1)
        );
    };
};

Element ** data;
std::atomic_flag * vector_safe;


bool alive = true;
uint32_t c_down_time = 0;
uint32_t p_down_time = 0;
uint32_t c_intinerations = 0;
uint32_t p_intinerations = 0;
std::chrono::high_resolution_clock::time_point c_time_point;
std::chrono::high_resolution_clock::time_point p_time_point;

int simple_consumer_work(){
    Element a_read;
    uint16_t i, e;
    while (alive){
        // Loops thru the vectors
        for (i=0; i < vector_size; i++){
            // locks the thread untin the vector 
            // at index i is free to read
            while (!vector_safe[i].test_and_set()){} 
                // Do the watherver
                for (e=0; e < vector_size; e++){
                    a_read = data[i][e];
                } 
            // And signal that this vector is done
            vector_safe[i].clear();
        }
    }
    return 0;
};
int simple_producer_work(){
    uint16_t i;
    while (alive){
        for (i=0; i < vector_size; i++){
            while (!vector_safe[i].test_and_set()){} 
            data[i][i].some_wait();
            vector_safe[i].clear();
        }
        p_intinerations++;
    }
    return 0;
};

int consumer_work(){
    Element a_read;
    uint16_t i, e;
    bool waiting;
    while (alive){

        for (i=0; i < vector_size; i++){
            waiting = false;
            c_time_point = std::chrono::high_resolution_clock::now();
            while (!vector_safe[i].test_and_set(std::memory_order_acquire)){
                waiting = true;
            } 
            if (waiting){
                c_down_time += (uint32_t)std::chrono::duration_cast<std::chrono::nanoseconds> 
                (std::chrono::high_resolution_clock::now() - c_time_point).count();
            }  
            for (e=0; e < vector_size; e++){
                a_read = data[i][e];
            } 
            vector_safe[i].clear(std::memory_order_release);
        }
        c_intinerations++;
    }
    return 0;
};
int producer_work(){
    bool waiting;
    uint16_t i;
    while (alive){
        for (i=0; i < vector_size; i++){
            waiting = false;
            p_time_point = std::chrono::high_resolution_clock::now();
            while (!vector_safe[i].test_and_set(std::memory_order_acquire)){
                waiting = true;
            } 
            if (waiting){
                p_down_time += (uint32_t)std::chrono::duration_cast<std::chrono::nanoseconds> 
                (std::chrono::high_resolution_clock::now() - p_time_point).count();
            } 
            data[i][i].some_wait();
            vector_safe[i].clear(std::memory_order_release);
        }
        p_intinerations++;
    }
    return 0;
};

void print_time(uint32_t down_time){
    if ( down_time <= 1000) {
        std::cout << down_time << " [nanosecods] \n";

    } else if (down_time <= 1000000) {
        std::cout << down_time / 1000 << " [microseconds] \n";
    
    } else if (down_time <= 1000000000) {
        std::cout << down_time / 1000000 << " [miliseconds] \n";
    
    } else {
        std::cout << down_time / 1000000000 << " [seconds] \n";
    }
};

int main(){

    std::uint16_t i;
    std::thread consumer;
    std::thread producer;

    vector_safe = new std::atomic_flag [vector_size] {ATOMIC_FLAG_INIT};
    data = new Element * [vector_size];
    for(i=0; i < vector_size; i++){
        data[i] = new Element;
    }

    consumer = std::thread(consumer_work);
    producer = std::thread(producer_work);

    std::this_thread::sleep_for(
        std::chrono::seconds(10)
    );

    alive = false;
    producer.join();
    consumer.join();

    std::cout << " Consumer loops > " << c_intinerations << std::endl;
    std::cout << " Consumer time lost > "; print_time(c_down_time);
    std::cout << " Producer loops > " << p_intinerations << std::endl;
    std::cout << " Producer time lost > "; print_time(p_down_time);

    for(i=0; i < vector_size; i++){
        delete data[i];
    }
    delete [] vector_safe;
    delete [] data;

    return 0;
}

别忘了编译器可以并且将会改变部分代码,spagueti 代码在多线程中确实存在缺陷。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-03-01
    • 1970-01-01
    • 2012-06-17
    • 2021-06-18
    相关资源
    最近更新 更多