【问题标题】:value based thread lock基于值的线程锁
【发布时间】:2020-06-17 09:13:10
【问题描述】:

如果以前有人问过这个问题,请原谅我。我环顾四周,但我觉得我没有合适的词汇来通过搜索网络找到这个。

我在 python 中有一个多线程应用程序。我希望能够锁定某个代码块,但仅限于具有特定条件的其他线程。我举个例子:有三个线程,thread_athread_bthread_c。每个线程都可以随时运行foo 函数。我不希望任何两个bar 相等的线程能够同时访问Code block ALPHA。但是,我不想阻止 bar 值不同的线程。在这种情况下,假设thread_a 有一个bar == "cat" 并首先点击(3)。在thread_a 命中行(5) 之前,假设thread_bbar == "cat" 命中行(3)。我想让thread_b 等待。但如果thread_cbar == "dog" 一起出现,我希望它能够继续前进。

(1) def foo(bar):
(2)    
(3)     lock(bar)
(4)     # Code block ALPHA (two threads with equivalent bar should not be in here)
(5)     unlock(bar)

另外,bar 的可能值是完全不可预测的,但发生冲突的可能性非常高。

感谢您的帮助。我正在查看的库是python threading library

【问题讨论】:

  • bar 是有限的吗?如果是,那么您可以设置任意数量的互斥锁,然后锁定与您的线程具有的任何bar 值相对应的互斥锁。
  • @numeral,好问题。 Bar 不是有限的,因为您可以为每种可能的情况合理地分配互斥锁。我将更新我的问题以反映这一点。
  • 在这种情况下,我唯一能想到的就是您将不得不跟踪您遇到的bar 并动态锁定。我想到的任何东西都涉及某种共享内存模型。这意味着您将必须拥有一个可以锁定所有动态锁的主锁。换句话说,您只有一个可以锁定所有线程的互斥锁。抱歉帮不上忙。 ://
  • 如果您尝试使用两层锁定系统,请确保您最终不会导致一个线程持有主锁并尝试锁定值锁,而另一个线程持有该值锁并且试图持有主锁。死锁不好玩。
  • @user2357112,您是否知道两层锁定系统的任何好的示例实现。我不想陷入僵局。

标签: python multithreading python-multithreading


【解决方案1】:

更新

好消息:我能够通过我拼凑起来的一个有点粗糙的测试台使用我的原始答案重现您遇到的release_lock 问题,并使用计数机制(如您所建议的那样)解决该问题 - 至少到目前为止我可以用我的测试仪器来判断。

现在使用了两个独立的共享字典,一个像以前一样跟踪与每个锁关联的“名称”或值,另一个跟踪在给定时间有多少线程在使用每个锁。

和以前一样,锁名称必须是可散列的值,以便它们可以用作字典中的键。

import threading

namespace_lock = threading.Lock()
namespace = {}
counters = {}

def aquire_lock(value):
    with namespace_lock:
        if value in namespace:
            counters[value] += 1
        else:
            namespace[value] = threading.Lock()
            counters[value] = 1

    namespace[value].acquire()

def release_lock(value):
    with namespace_lock:
        if counters[value] == 1:
            del counters[value]
            lock = namespace.pop(value)
        else:
            counters[value] -= 1
            lock = namespace[value]

    lock.release()

# sample usage    
def foo(bar):
    aquire_lock(bar)
    # Code block ALPHA (two threads with equivalent bar should not be in here)
    release_lock(bar)

【讨论】:

  • 感谢您为撰写本文付出的努力。我将对其进行测试并报告。看起来很有希望。
  • 在进行任何测试之前,这给我留下了深刻的印象。我发现的唯一问题是可以过早地从namespace 弹出值。当两个线程试图获取相同的值时,当release_lock被第一个线程调用时,它会弹出该值,而第二个线程则通过namespace[value].acquire()。当第二个线程调用release_lock时,值已经不在namespace中,无法弹出。我认为修复所需要的只是添加一个计数机制,让释放线程知道它是否应该弹出。总体来说很棒的答案!
【解决方案2】:

拥有一个锁,在线程尝试进入或退出临界区时获取,并为bar 的每个值使用单独的条件变量。以下内容可能会被优化以创建更少的条件变量,但在这篇文章中这样做感觉像是过早的优化:

import collections
import contextlib
import threading

lock = threading.Lock()

wait_tracker = collections.defaultdict(lambda: (False, 0, threading.Condition(lock)))

@contextlib.contextmanager
def critical(bar):
    with lock:
        busy, waiters, condition = wait_tracker[bar]
        if busy:
            # Someone with the same bar value is in the critical section.

            # Record that we're waiting.
            waiters += 1
            wait_tracker[bar] = busy, waiters, condition

            # Wait for our turn.
            while wait_tracker[bar][0]:
                condition.wait()

            # Record that we're not waiting any more.
            busy, waiters, condition = wait_tracker[bar]
            waiters -= 1

        # Record that we're entering the critical section.
        busy = True
        wait_tracker[bar] = busy, waiters, condition
    try:
        # Critical section runs here.
        yield
    finally:
        with lock:
            # Record that we're out of the critical section.
            busy, waiters, condition = wait_tracker[bar]
            busy = False
            if waiters:
                # Someone was waiting for us. Tell them it's their turn now.
                wait_tracker[bar] = busy, waiters, condition
                condition.notify()
            else:
                # No one was waiting for us. Clean up a bit so the wait_tracker
                # doesn't grow forever.
                del wait_tracker[bar]

那么每个想要进入临界区的线程都会做以下事情:

with critical(bar):
    # Critical section.

此代码未经测试,并行性很困难,尤其是锁和共享内存并行性。我不保证它会起作用。

【讨论】:

  • 有机会我会仔细看看这个。
【解决方案3】:

这是一个面向类的解决方案,适用于需要几组单独的锁的情况。

# A dynamic group of locks, useful for parameter based locking.
class LockGroup(object):

    def __init__(self):
        self.lock_dict = {}
        self.lock = threading.Lock()

    # Returns a lock object, unique for each unique value of param.
    # The first call with a given value of param creates a new lock, subsequent
    # calls return the same lock.
    def get_lock(self, param):
        with self.lock:
            if param not in self.lock_dict:
                self.lock_dict[param] = threading.Lock()
            return self.lock_dict[param]

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-06-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-31
    相关资源
    最近更新 更多