【问题标题】:Set behavior to be executed when a thread would otherwise finish设置要在线程完成时执行的行为
【发布时间】:2019-07-09 19:55:38
【问题描述】:

我的模块中有两个函数:do_something()change_behavior()

函数do_something() 默认执行Thing A。在调用change_behavior() 之后,do_something() 会改为执行Thing B

我希望这种转换是特定于线程的。也就是说,任何新线程在调用do_something() 时都会发生Thing A,但如果该线程调用change_behavior(),那么当它继续时Thing B 会发生拨打do_something()

每个线程应该是独立的,这样一个线程调用change_behavior()不会影响do_something()对其他线程的行为。


对此我本能的解决方案是将行为绑定到线程的 ID(通过 threading.get_ident() 评估)。函数do_something() 检查本地表中是否存在线程ID,并相应地调整其行为。同时,函数change_behavior() 只是将当前线程添加到该注册表中。这在任何给定时间都有效,因为永远不会有两个具有相同 ID 的并发线程。

当当前线程集加入时,问题就出现了,随着时间的流逝,父线程产生了更多的线程。其中一个新线程与以前的线程之一具有相同的 ID,因为有时会重用线程 ID。该线程调用do_something(),因为它已经在注册表中,所以它执行Thing B 而不是Thing A

要解决此问题,我需要以某种方式从注册表中删除线程 ID,在具有该 ID 的第一个线程结束和具有该 ID 的第二个线程开始之间。我提出的一些假设性想法:

  • 定期检查每个线程 ID 是否仍处于活动状态。这是有问题的,因为它既浪费 CPU 资源,又会在线程被销毁然后在滴答之间重新创建时丢失
  • 附加一个方法挂钩,以便在线程加入时调用。除了下一个想法,我不知道该怎么做
  • 作为change_behavior() 的一部分,劫持/替换当前线程的._quit() 方法,使用首先从注册表中删除线程ID 的方法。这似乎是一种不好的做法,并且可能会破坏。

我的用例的另一个方面是,如果可能的话,我希望新线程继承其父线程的当前行为,这样用户就不必手动设置他们创建的每个标志 - 但这是与我如何存储有关胎面状态的信息的相关性比与线程完成时的相关性更高,这使得它与这个特定问题的相关性略微降低。

我正在寻找有关这些特定解决方案是否是理想、标准或惯用的指导,以及在此用例中是否有预期的事情要做。


@TarunLalwani 在 cmets 中建议使用 threading.local()。我已经对其进行了调查,它很有用,但它没有考虑到我想要处理的其他用例 - 当父线程创建新的子线程时,我希望它们继承父线程的状态.我正在考虑通过替换 Thread.__init__() 来实现这一点,但使用 local() 通常与此用例不兼容,因为我无法将变量从父线程传递给子线程。


我也一直在尝试,更成功的是,只需将我的属性保存到线程本身:

current_thread = threading.current_thread()
setattr(current_thread, my_reference, new_value)

this 的问题在于,出于一个让我完全困惑的原因,模块命名空间中当前值为current_thread.my_reference 的任何其他变量也被设置为new_value。我不知道为什么,而且我一直无法在 MVE 中复制该问题(尽管它在我的 IDE 中始终如一地发生,即使在重新启动它之后也是如此)。正如my other currently-active question 暗示的那样,我在此处设置的对象是对输出流的引用(我在该答案中描述的对中间 IO 流实例的每个引用都将被调用此方法的文件描述符所取代),如果这与它有关,但我无法想象为什么在这种情况下对象的类型会影响引用的工作方式。

【问题讨论】:

  • 根据下面的链接,threading.get_ident() 没有返回一个真实的ID。您可以使用 ctypes 获取操作系统 ID。 blog.devork.be/2010/09/finding-linux-thread-id-from-within.html
  • @postoronnim 知道这很有帮助,谢谢,但我认为它不能解决问题。操作系统线程 ID 也可以重复,所以问题仍然存在。我想我将threading.get_ident() 用于其预期目的-“作为一种神奇的cookie,例如用于索引线程特定数据的字典。”操作系统确实努力避免太快地复制线程 ID,所以在这里结合使用它和“定期检查”策略是否是理想的?
  • 听起来“定期检查”可能会给您带来更多麻烦。我倾向于寻找一种制作唯一线程标识符的方法。也许通过将 threading.get_ident() 和堆栈指针组合成一个整数? stackoverflow.com/questions/34164854/threads-memory-layout
  • 你能发布你正在使用的线程方法吗?使用类、函数或分叉?它可能有助于建议一种特定于您的设计的方法
  • @TarunLalwani 我的代码本身不使用多线程,但假设调用它的代码会使用,并且它希望每个线程的行为都是唯一的。我希望这是通过threading 模块(multiprocessing 因为非共享内存而更容易处理,而fork/subprocess 无论如何都会超出现有程序的影响范围)

标签: python multithreading thread-safety


【解决方案1】:

我的回答是对您问题的一个非常简单的回答,因此我想知道我是否遗漏了什么。基本上,我认为您应该避免在模块中存储外部对象的当前状态。

您需要在某处存储状态(如果调用了change_behavior 并且可能还有其他一些数据)。您有两个主要选择:将状态存储在模块中或将状态存储在线程本身中。除了在模块中存储状态时遇到的问题,人们希望模块(主要)是无状态的,因此我认为您应该坚持后者并将数据存储在线程中。

版本 1

如果您将状态存储在字段中,您创建的属性名称和现有属性的名称之间会有一点冲突的风险,但是如果文档清晰并且您选择了一个好的名称,那应该不是问题。

一个简单的概念证明,没有setattrhasattr(我没有检查CPython的源代码,但可能奇怪的行为来自setattr):

module1.py

​​>
import threading
import random
import time

_lock = threading.Lock()

def do_something():
    with _lock:
        t = threading.current_thread()
        try:
            if t._my_module_s:
                print(f"DoB ({t})")
            else:
                print(f"DoA ({t})")
        except AttributeError:
            t._my_module_s = 0
            print(f"DoA ({t})")

    time.sleep(random.random()*2)

def change_behavior():
    with _lock:
        t = threading.current_thread()
        print(f"Change behavior of: {t}")
        t._my_module_s = 1

test1.py

​​>
import random
import threading
from module1 import *

class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        n = random.randint(1, 10)
        for i in range(n):
            do_something()
        change_behavior()
        for i in range(10-n):
            do_something()

thread_1 = MyThread()
thread_2 = MyThread()
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()

输出 1

DoA (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoA (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-1, started 140155115792128)>)
Change behavior of: <MyThread(Thread-1, started 140155115792128)>
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
Change behavior of: <MyThread(Thread-2, started 140155107399424)>
DoB (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoB (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-2, started 140155107399424)>)

版本 2

如果您确定最终用户会在线程中使用您的模块,您可以为他/她提供一种方便的方式来执行此操作。这个想法是自己处理线程。只需将用户函数包装在一个线程中,并将线程的状态存储在该线程中,如上。不同之处在于您是Thread 子类的所有者,您可以避免名称冲突的风险。另外,在我看来,代码变得更干净了:

module2.py

​​>
import threading
import random
import time

_lock = threading.Lock()

def do_something():
    with _lock:
        t = threading.current_thread()
        t.do_something() # t must be a _UserFunctionWrapper
    time.sleep(random.random()*2)

def change_behavior():
    with _lock:
        t = threading.current_thread()
        t.change_behavior() # t must be a _UserFunctionWrapper

def wrap_in_thread(f):
    return _UserFunctionWrapper(f)

class _UserFunctionWrapper(threading.Thread):
    def __init__(self, user_function):
        threading.Thread.__init__(self)
        self._user_function = user_function
        self._s = 0

    def change_behavior(self):
        print(f"Change behavior of: {self}")
        self._s = 1

    def do_something(self):
        if self._s:
            print(f"DoB ({self})")
        else:
            print(f"DoA ({self})")

    def run(self):
        self._user_function()

test2.py

​​>
import random
from module2 import *

def user_function():
    n = random.randint(1, 10)
    for i in range(n):
        do_something() # won't work if the function is not wrapped
    change_behavior()
    for i in range(10-n):
        do_something()

thread_1 = wrap_in_thread(user_function)
thread_2 = wrap_in_thread(user_function)
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()

输出 2

DoA (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
Change behavior of: <_UserFunctionWrapper(Thread-1, started 140193896072960)>
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
Change behavior of: <_UserFunctionWrapper(Thread-2, started 140193887680256)>
DoB (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)

缺点是即使不需要也必须使用线程。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-03-18
    • 1970-01-01
    • 2014-08-21
    • 2022-01-10
    • 1970-01-01
    • 2016-08-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多