【问题标题】:How can i lock a MUTEX for an element in the array, not for the complete array我如何为数组中的一个元素锁定一个 MUTEX,而不是整个数组
【发布时间】:2015-09-17 16:03:23
【问题描述】:

问题的简短版本:我有 2 个函数共享同一个数组,当一个正在编辑它时,另一个正在读取它。但是,向量很长(5000 个样本)并且很少发生并发访问。但是MUTEX1 上的 Mutex 争用正在减慢程序的速度。 '

如何锁定内存的某些位置而不是整个块以减少争用?

编辑:注意:我必须尽可能使用更新的 G 值。

EDIT2:例如,我有长度为 5000 的数组 G。foo1 锁定 mutex1 以编辑索引 124。虽然 foo2 想要编辑索引 2349,但直到 foo1 发布 mutex1 才能编辑。

有没有办法可以将锁定互斥锁的争用转移到元素级别?含义:我希望foo2foo1 只在同一个互斥体上竞争,只有当他们想要编辑同一个索引时。例如:foo1 想要编辑索引 3156,foo2 想要编辑索引 3156。

带代码说明的长版: 我正在为一个复杂的数学函数编写代码,并且我正在使用 pthreads 来并行代码并提高性能。代码非常复杂,我可以发布它,但我可以将模型发布到代码中。

基本上我有 2 个数组,我想使用 2 个并行运行的线程来编辑它们。一个线程运行foo1,另一个运行foo2。但是,它们应该以特定的顺序运行,我使用 mutexes(_B,_A1, 和 _A2) 来授予该序列。它是这样的:

foo1 (first half)
foo2 (first half) and foo1 (second half) (in parallel)
foo1 (first half) and foo2 (second half) (in parallel)
...
foo2(second half)

然后我会检索我的结果。 在foo1 的前半部分,我将使用G1 中的结果,这可能会同时被foo2 编辑。因此我使用Mutex1 来保护它。对于Gfoo2 也会发生同样的情况。但是,将完整向量锁定为 1 个值非常有效,它们几乎从不同时编辑相同的内存位置。当我比较结果时,几乎总是一样的。我想要一种方法一次锁定一个元素,这样它们就只会竞争同一个元素。

我将为有兴趣了解其工作原理的人描述代码:

#include <pthread.h>
#include <iostream>

using namespace std;

#define numThreads 2
#define Length 10000

pthread_t threads[numThreads];

pthread_mutex_t mutex1   = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Mutex_B  = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Mutex_A1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Mutex_A2 = PTHREAD_MUTEX_INITIALIZER;

struct data_pointers
{
    double  *A;
    double  *B;
    double  *G;
    double  *L;
    int idxThread;
};

void foo1   (data_pointers &data);
void foo2   (data_pointers &data);

void *thread_func(void *arg){
    data_pointers data = *((data_pointers *) arg);
    if (data.idxThread==0)
        foo1 (data);
    else
        foo2 (data);
}

到这里是定义和线程调用函数,记住我定义了Length 10000numThreads 2

void foo1 ( data_pointers &data)
{
    double *A           = data.A;
    double *L           = data.L; 
    double *G           = data.G; 
    double U;

    for (int ijk =0;ijk<5;ijk++){
        /* here goes some definitions*/

        pthread_mutex_lock(&Mutex_A1);

        for (int k =0;k<Length;k++){
            pthread_mutex_lock(&mutex1); 
            U = G[k];
            pthread_mutex_unlock(&mutex1);
            /*U undergoes a lot of mathematical operations here


            */
        }

        pthread_mutex_lock(&Mutex_B);
        pthread_mutex_unlock(&Mutex_A2);
        for (int k =0;k<Length;k++){
            /*U another mathematical operations here


            */
            pthread_mutex_lock(&mutex1);
            L[k] = U;
            pthread_mutex_unlock(&mutex1);
            pthread_mutex_unlock(&Mutex_B);
        }
    }
}

在 foo1 中我锁定 mutexA1 并完成我的工作,然后我锁定 MutexB 并解锁 MutexA2 以便 foo2 可以开始工作。请注意,main 从锁定 MutexA2 开始。这样我保证foo1mutexB锁定的情况下开始后半部分,这样foo2不能进入函数的后半部分,直到foo1解锁mutexB

void foo2 (data_pointers &data)
{
    double *A           = data.A;
    double *L           = data.L; 
    double *G           = data.G; 
    double U;

    for (int ijk =0;ijk<5;ijk++){
        /* here goes some definitions*/

        pthread_mutex_lock(&Mutex_A1);

        for (int k =0;k<Length;k++){
            pthread_mutex_lock(&mutex1); 
            U = G[k];
            pthread_mutex_unlock(&mutex1);
            /*U undergoes a lot of mathematical operations here


            */
        }

        pthread_mutex_lock(&Mutex_B);
        pthread_mutex_unlock(&Mutex_A2);
        for (int k =0;k<Length;k++){        
            /*U another mathematical operations here


            */
            pthread_mutex_lock(&mutex1);
            L[k] = U;
            pthread_mutex_unlock(&mutex1);
            pthread_mutex_unlock(&Mutex_B);

        }
    }
}

现在,当foo1 解锁mutexB 时,它必须等待foo2 解锁mutexA1 才能工作,foo2 只会在mutexA2 已经解锁mutexB 时解锁。

这样持续了 5 次。

int main(){
    double G1[Length];
    double G2[Length];
    double B1[Length];
    double B2[Length];
    double A2[Length];
    double A1[Length];
    data_pointers data[numThreads];

    data[0].L           = G2;
    data[0].G           = G1;   
    data[0].A           = A1;
    data[0].B           = B1;
    data[0].idxThread   = 0;

    data[1].L           = G1;
    data[1].G           = G2;   
    data[1].A           = A2;
    data[1].B           = B2;
    data[1].idxThread   = 1;

    pthread_mutex_lock(&Mutex_A2);

    pthread_create(&(threads[0]), NULL, thread_func, (void *) &(data[0]));
    pthread_create(&(threads[1]), NULL, thread_func, (void *) &(data[1]));
    pthread_join(threads[1], NULL);
    pthread_join(threads[0], NULL);

    pthread_mutex_unlock(&Mutex_A1);
    pthread_mutex_unlock(&Mutex_A2);

    return 0;
}

请注意,这只是一个示例代码。按预期编译和工作,但没有输出。

最后编辑:谢谢大家的好主意,我有很多经验,并且很高兴遵循这些建议。我将对所有有用的答案进行投票,并选择最接近原始问题(原子性)的答案

【问题讨论】:

  • 双精度和互斥体对的向量?
  • @Joe 你能详细说明一下吗?我不明白。但请注意,必须使用更新后的值。
  • 我想我的意思是,如果你只想锁定一个特定的元素,你需要为每个元素加一个锁。所以你可以有成对的锁和替身,让你单独锁定每个替身。虽然真正的胜利是提供一种算法,保证两个线程永远不需要同时访问同一个元素。
  • 实际上在我的代码中,如果我正在编辑一个奇怪的索引,只有其他的索引有并发编辑的危险,所以我可以创建 2 个互斥锁。但问题是 if 语句将比互斥体情况慢得多。
  • 您可以尝试对编辑器线程正在编辑的任何元素使用原子指针,在您的处理(阅读器)线程中,检查该指针以查看它是否正在使用。这使您可以处理/读取当前未编辑的任何元素。存在一个问题,如果您的处理完成得比您的编辑更快,并且您从当前正在编辑的元素之前的元素开始处理,那么您的处理线程将阻塞当前正在编辑的元素之后的所有元素。回家后我会尝试想出一个解决方案。

标签: c++ multithreading pthreads mutex


【解决方案1】:

使用原子指针“锁定”内存中某些位置的示例代码:

#include <vector>
#include <atomic>
#include <thread>

using container = std::vector<std::atomic<double>>;
using container_size_type = container::size_type;

container c(300);

std::atomic<container::pointer> p_busy_elem{ nullptr };

void editor()
{
    for (container_size_type i{ 0 }, sz{ c.size() }; i < sz; ++i)
    {
        p_busy_elem.exchange(&c[i]); // c[i] is busy
        // ... edit c[i] ... // E: calculate a value and assign it to c[i]
        p_busy_elem.exchange(nullptr); // c[i] is no longer busy
    }
}

void reader()
{
    for (container_size_type i{ 0 }, sz{ c.size() }; i < sz; ++i)
    {
        // A1: wait for editor thread to finish editing value
        while (p_busy_elem == &c[i])
        {
            // A2: room a better algorithm to prevent blocking/yielding
            std::this_thread::yield();
        }

        // B: if c[i] is updated in between A and B, this will load the latest value
        auto value = c[i].load();

        // C: c[i] might have changed by this time, but we had the most up to date value we could get without checking again
        // ... use value ...
    }
}

int main()
{
    std::thread t_editor{ editor };
    std::thread t_reader{ reader };
    t_editor.join();
    t_reader.join();
}

在编辑器线程中,busy 指针设置为指示当前正在编辑该内存位置 (E)。如果线程 B 在设置忙碌指针后尝试读取该值,它将等到编辑完成后再继续(A1)。

A2 的注意事项:这里可以放置一个更好的系统。可以保留尝试读取时繁忙的节点列表,然后我们会将i 添加到该列表并尝试稍后处理该列表。好处:可以告诉循环执行continue,并读取当前正在编辑的i 之后的索引。

复制要读取的值 (B) 以便在需要时使用它 (C)。这是我们最后一次在c[i] 上检查最新值。

【讨论】:

  • 这个很快,可能需要编辑它。任何注意到问题的人都非常欢迎指出它们!
【解决方案2】:

如果您不调整数组大小,则不需要对单个元素或整个数组使用任何互斥锁。

以原子方式阅读您的价值观,以原子方式书写您的价值观并保持冷静。

【讨论】:

  • 是的,我不调整数组的大小。你能详细说明一下吗?为什么我不需要互斥锁?如果两个线程试图同时编辑同一个索引怎么办?我是否使用原子指针来确保原子性?
  • 如果两个线程尝试同时编辑一个原子值,那么其中一个线程要么先获得它,要么先获得它,这样就不会有数据竞争(同时获得它),因为这就是原子性保证。至于我在原始帖子中的评论,您还可以有一个列表来跟踪无法处理/写入的值,因为它们正被线程使用并在某个时候处理该列表(我不得不考虑更多关于最佳时间的信息)。
  • @user304584,读者将获得“有效”值 - 意思是,它不会被脏写,它将是更新之前或之后的那个。无论如何,这个值应该对你有好处。
  • 补充@SergeyA所说的,使用互斥锁已经给你一个类似的行为:你要么得到更新之前的那个,要么得到更新之后的那个,除了你阻止了整个访问大批。原子存储和加载解决了这个问题。
【解决方案3】:

如果您想在不使用互斥锁的情况下对类似数组的数据结构进行高性能多线程访问,您可以研究比较和交换。也许您可以设计一个无锁数据结构来解决您的特定问题。 https://en.wikipedia.org/wiki/Compare-and-swap

关于发布的代码,您似乎使事情复杂化了。如果你想达到:

foo1 (first half)
foo2 (first half) and foo1 (second half) (in parallel)
foo1 (first half) and foo2 (second half) (in parallel)
...
foo2(second half)

两个互斥体应该可以。

也许这可以做到。下面是一些伪代码:

// These global variables controls which thread is allowed to
// execute first and second half.
// 1 --> Foo1 may run
// 2 --> Foo2 may run
int accessFirstHalf = 1;
int accessSecondHalf = 1;

void foo1 ( data_pointers &data)
{
    while(YOU_LIKE_TO_GO_ON)
    {
        while (true)
        {
            TAKE_MUTEX_FIRST_HALF;
            if (accessFirstHalf == 1)
            {
                RELEASE_MUTEX_FIRST_HALF;
                break;
            }
            RELEASE_MUTEX_FIRST_HALF;
            pthread_yield();
        }

        // Do the first half

        TAKE_MUTEX_FIRST_HALF;
        // Allow Foo2 to do first half
        accessFirstHalf == 2;
        RELEASE_MUTEX_FIRST_HALF;

        while (true)
        {
            TAKE_MUTEX_SECOND_HALF;
            if (accessSecondHalf == 1)
            {
                RELEASE_MUTEX_SECOND_HALF;
                break;
            }
            RELEASE_MUTEX_SECOND_HALF;
            pthread_yield();
        }

        // Do the second half

        TAKE_MUTEX_SECOND_HALF;
        // Allow Foo2 to do second half
        accessSecondHalf == 2;
        RELEASE_MUTEX_SECOND_HALF;
    }
}


void foo2 ( data_pointers &data)
{
    while(YOU_LIKE_TO_GO_ON)
    {
        while (true)
        {
            TAKE_MUTEX_FIRST_HALF;
            if (accessFirstHalf == 2)
            {
                RELEASE_MUTEX_FIRST_HALF;
                break;
            }
            RELEASE_MUTEX_FIRST_HALF;
            pthread_yield();
        }

        // Do the first half

        TAKE_MUTEX_FIRST_HALF;
        // Allow Foo1 to do first half
        accessFirstHalf == 1;
        RELEASE_MUTEX_FIRST_HALF;

        while (true)
        {
            TAKE_MUTEX_SECOND_HALF;
            if (accessSecondHalf == 2)
            {
                RELEASE_MUTEX_SECOND_HALF;
                break;
            }
            RELEASE_MUTEX_SECOND_HALF;
            pthread_yield();
        }

        // Do the second half

        TAKE_MUTEX_SECOND_HALF;
        // Allow Foo1 to do second half
        accessSecondHalf == 1;
        RELEASE_MUTEX_SECOND_HALF;
    }
}


int main()
{
    // start the threads with foo1 and foo2
}

【讨论】:

  • 那是我最初的设计。但是我发现线程 1 可能会锁定和解锁前半部分和后半部分互斥锁,然后,互斥锁将开始工作,这是不同步的。这就是我想出 3 mutex 策略的原因。另外,我这样做是为了避免使用线程池,或者不得不多次调用 pthread_create。
  • 我意识到我的第一个答案是错误的。希望这会更好。添加了 yield 以便等待线程允许其他线程访问 CPU。
【解决方案4】:

这似乎是您要求的核心:

foo1 (first half)
foo2 (first half) and foo1 (second half) (in parallel)
foo1 (first half) and foo2 (second half) (in parallel)
...
foo2(second half)

使用 pthread 实现这种交错的最简单方法是使用屏障。

使用count 2 初始化带有pthread_barrier_init() 的屏障。foo1() 然后执行:

first half
pthread_barrier_wait()
second half
pthread_barrier_wait()
...
first half
pthread_barrier_wait()
second half
pthread_barrier_wait()

foo2() 执行的顺序略有不同:

pthread_barrier_wait()
first half
pthread_barrier_wait()
second half
....
pthread_barrier_wait()
first half
pthread_barrier_wait()
second half

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-10-04
    • 2011-10-06
    • 1970-01-01
    • 1970-01-01
    • 2019-09-19
    • 1970-01-01
    • 1970-01-01
    • 2021-08-18
    相关资源
    最近更新 更多