【问题标题】:How to define thread safe array?如何定义线程安全数组?
【发布时间】:2015-04-22 19:04:17
【问题描述】:

如何以最少的修改定义一个线程安全的全局数组?

我希望通过使用互斥锁和同步块来完成对它的每次访问。

像“T”这样的东西将是某种类型(请注意,AFAIK 当前未定义“同步”关键字):

sync Array!(T) syncvar;

并且每次访问它都会与此类似:

Mutex __syncvar_mutex;

    //some func scope....
    synchronized(__syncvar_mutex) { /* edits 'syncvar' safely */ }

【问题讨论】:

  • 保护阵列本身很容易。您只需创建一个包装结构,其中所有数组函数和运算符都重载,并且所有函数都同步。问题在于获取数组本身的元素。保护这些也变得更加复杂,因为一旦你从像 opIndex 这样的函数中返回它们,它们就不再受到保护......
  • 我认为这是 shared(T[]) 应该做的,但显然不是......
  • @AdamD.Ruppe All shared 真正做到的是使变量在线程之间共享,而不是线程本地。在某些情况下,如果您尝试使用 shared 做一些保证会出现问题的事情(在某些情况下与原子操作 IIRC 有关),编译器会抱怨,但它不会对互斥锁或同步做任何事情。随你(由你决定。同步类是处理共享对象的推荐方法,但它们没有完全实现(只是同步函数),并且任何逃脱该类的东西都将不再受到保护。
  • 如果你能帮助我的人 - 请发布一个答案,不要让一些人......来获取你的赏金。因为最终(6 天后)它会自动送给不值得的人。
  • 如果我有一个好的解决方案,我早就回答了……但我真的没有。我能想到的最好的就像乔纳森说的,尝试用同步方法做一个包装器。但我真的不知道。

标签: arrays multithreading synchronization d


【解决方案1】:

我天真的尝试是做这样的事情:

import std.typecons : Proxy:

synchronized class Array(T)
{
    static import std.array;
    private std.array.Array!T data;
    mixin Proxy!data;
}

很遗憾,由于https://issues.dlang.org/show_bug.cgi?id=14509,它不起作用

不能说我很惊讶,因为在现代 D 中通过隐藏的互斥锁自动处理多线程是非常单一的,并且同步类的概念主要是 D1 时代的遗留物。

当然,您可以手动实现相同的解决方案,方法是定义自己的 SharedArray 类和所有必要的方法,并在调用内部私有纯 Array 方法之前在方法内添加锁。但我想你想要一些开箱即用的东西。

现在不能发明任何更好的东西(会考虑更多)但值得注意的是,通常在 D 中鼓励创建旨在明确处理共享访问的数据结构,而不是仅仅保护正常的数据结构与互斥锁。当然,最受鼓励的方法是完全不使用消息传递来共享数据。

如果我有更好的想法,我会更新答案。

【讨论】:

  • 不 - 我想要一些有效且不会太长的东西。无论如何,谢谢 - 我会试一试。
【解决方案2】:

在数组周围创建一个包装器是相当容易的,这将使它成为线程安全的。但是,要制作一个没有并发瓶颈的线程安全数组是极其困难的。

想到的最接近的是 Java 的 CopyOnWriteArrayList 类,但即使这样也不理想......

【讨论】:

  • 我知道,但我不在乎,因为这正是我想要的(并发瓶颈)。然而,当我滚动答案时 - 对我来说似乎并不“相当容易”。
【解决方案3】:

您可以将数组包装在一个 struct 中,当线程获取令牌时锁定对数组的访问,直到它释放它为止。

包装器/储物柜:

  • acquire(): 被线程循环调用。因为它返回一个指针,当方法返回一个非空值时,线程就知道它拥有令牌。
  • release():在处理了之前已经获得访问权的数据后被线程调用。

.

shared struct Locker(T)
{
    private:
        T t;
        size_t token;   
    public:  
        shared(T) * acquire() 
        {
            if (token) return null;
            else
            {
                import core.atomic;
                atomicOp!"+="(token, 1);
                return &t;
            }
        }
        void release()
        {
            import core.atomic;
            atomicOp!"-="(token, 1);
        }
}

快速测试:

alias LockedIntArray = Locker!(size_t[]);
shared LockedIntArray intArr;

void arrayTask(size_t cnt)
{
    import core.thread, std.random;

    // ensure the desynchronization of this job.
    Thread.sleep( dur!"msecs"(uniform(4, 20)));

    shared(size_t[])* arr = null;
    // wait for the token
    while(arr == null) {arr = intArr.acquire;}

    *arr ~= cnt;    
    import std.stdio;
    writeln(*arr);

    // release the token for the waiting threads
    intArr.release;
}

void main(string[] args)
{
    import std.parallelism;
    foreach(immutable i; 0..16)
    {
       auto job = task(&arrayTask, i);
       job.executeInNewThread(); 
    } 
}

缺点是对数组的每个操作块都必须用 acquire/release 对包围。

【讨论】:

  • 好吧 - 我可以使用“同步”块来实现相同的效果。这里没有任何真正的好处吗?
  • 每个人都已经告诉你怎么做,没有简单的方法(最小的修改):将数组包装在结构中并重载你感兴趣的运算符。 Михаил Страшун 解决方案会很好,但正如他所注意到的,这不起作用,这是一个非常有趣的 solution 概念 BTW。您是否意识到您可以从几天开始编写它(我的意思是包装器)?也不要忘记您可以在 NG/论坛上获得更好的答案。不是每个人都喜欢/使用 SO。
【解决方案4】:

你的想法是对的。作为一个数组,您需要能够编辑和检索信息。我建议你看看 Phobos 提供的 read-write mutexatomic 实用程序。读取操作相当简单:

  1. synchronize mutex.readLock
  2. 加载(使用atomicLoad
  3. 将项目从synchronize 块中复制出来
  4. 返回复制的项目

写作应该几乎完全相同。只需在mutex.writeLock 上进行syncronize 并执行casatomicOp 操作。

请注意,这仅在您在读取期间复制数组中的元素时才有效。如果要获取引用,每次访问或修改元素时都需要对元素做额外的同步。

【讨论】:

  • 我信任这个系统,但我觉得它会再次让我失望。公平地告诉我,你认为你应该得到这个答案的“+150”声誉吗?
  • 有效吗?这真的很重要不是吗?如果您认为我的回答值得,那么是的。你问了这个问题。我给了你答案。如果你觉得不够,那么我们可以看看如何修改。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-06-10
  • 1970-01-01
  • 2021-12-23
  • 1970-01-01
相关资源
最近更新 更多