【问题标题】:Does pydispatcher run the handler function in a background thread?pydispatcher 是否在后台线程中运行处理程序函数?
【发布时间】:2019-08-07 21:14:56
【问题描述】:

在查找事件处理程序模块时,我遇到了 pydispatcher,它似乎对初学者很友好。我对该库的用例是,如果我的队列大小超过阈值,我想发送一个信号。然后,处理函数可以开始处理并从队列中删除项目(并随后将批量插入到数据库中)。

我希望处理函数在后台运行。我知道我可以简单地覆盖 queue.append() 方法检查队列大小并异步调用处理程序函数,但我想实现侦听器-调度程序模型以保持逻辑干净和分离。

pydispatcher 是否开箱即用?如果没有,是否有另一个模块可以帮助我做到这一点?我是否需要管理对队列的访问,因为可能有多个线程同时处理和附加到队列?

请注意,在我的用例中,只有一个调度程序和事件处理程序。

【问题讨论】:

    标签: python multithreading python-multiprocessing python-multithreading


    【解决方案1】:

    我最近发布了Akuanduba 模块,它可以帮助您完成这项任务。存储库中有一个示例可以帮助您了解它的工作原理,并且看起来与您想要的相似。

    无论如何,我将尝试在这里解释一种使用 Akuanduba 实现代码的方法:

    • 首先,您可以制作一个数据框来保存您的队列:
    # Mandatory imports
    from Akuanduba.core.messenger.macros import *
    from Akuanduba.core.constants import *
    from Akuanduba.core import NotSet, AkuandubaDataframe
    # Your imports go here:
    from queue import Queue
    
    class MyQueue (AkuandubaDataframe):
    
      def __init__(self, name):
    
        # Mandatory stuff
        AkuandubaDataframe.__init__(self, name)
    
        self.__queue = Queue ()
    
      def getQueue (self):
        return self.__queue
    
      def putQueue (self, val):
        self.__queue.put(val)
    
      def getQueueSize (self):
        return self.__queue.qsize()
    
      #
      # "toRawObj" method is a mandatory method that delivers a dict with the desired data
      # for file saving
      #
      def toRawObj(self):
        d = {
              "Queue" : self.getQueue(),
              }
        return d
    
    • 然后您可以创建一个 TriggerCondition 来检查队列大小:
    from Akuanduba.core import StatusCode, NotSet, StatusTrigger
    from Akuanduba.core.messenger.macros import *
    from Akuanduba.core import TriggerCondition
    import time
    
    class CheckQueueSize (TriggerCondition):
    
      def __init__(self, name, maxSize):
    
        TriggerCondition.__init__(self, name)
        self._name = name
        self._maxSize = maxSize
    
      def initialize(self):
    
        return StatusCode.SUCCESS
    
      def execute (self):
    
        size = self.getContext().getHandler("MyQueue").getQueueSize()
        if (size > SIZE_THRESHOLD):
          return StatusTrigger.TRIGGERED
        else:
          return StatusTrigger.NOT_TRIGGERED
    
      def finalize(self):
    
        return StatusCode.SUCCESS
    
    • 制作一个工具作为你的处理函数:
    # Mandatory imports
    from Akuanduba.core import AkuandubaTool, StatusCode, NotSet, retrieve_kw
    # Your imports go here:
    
    class SampleTool(AkuandubaTool):
    
      def __init__(self, name, **kw):
    
        # Mandatory stuff
        AkuandubaTool.__init__(self, name)
    
    
      def initialize(self):
    
        # Lock the initialization. After that, this tool can not be initialized once again
        self.init_lock()
        return StatusCode.SUCCESS
    
    
      def execute(self,context):
    
        #
        # DO SOMETHING HERE
        #
    
        # Always return SUCCESS
        return StatusCode.SUCCESS
    
      def finalize(self):
        self.fina_lock()
        return StatusCode.SUCCESS
    
    • 最后,制作一个主脚本以使它们一起工作:
    # Akuanduba imports
    from Akuanduba.core import Akuanduba, LoggingLevel, AkuandubaTrigger
    from Akuanduba import ServiceManager, ToolManager, DataframeManager
    
    # This sample's imports
    import MyQueue, CheckQueueSize, SampleTool
    
    # Creating your handler
    your_handler = SampleTool ("Your Handler's name")
    
    # Creating dataframes
    queue = MyQueue ("MyQueue")
    
    # Creating trigger
    trigger  = AkuandubaTrigger("Sample Trigger Name", triggerType = 'or')
    
    # Append conditions and tools to trigger just adding them
    # Tools appended to the trigger will only run when trigger is StatusTrigger.TRIGGERED,
    # and will run in the order they've been appended
    trigger += CheckQueueSize( "CheckQueueSize condition", MAX_QUEUE_SIZE )
    trigger += your_handler
    
    # Creating Akuanduba
    manager = Akuanduba("Akuanduba", level=LoggingLevel.INFO)
    
    # Appending tools
    #
    # ToolManager += TOOL_1
    # ToolManager += TOOL_2
    #
    ToolManager += trigger
    
    # Apprending dataframes
    DataframeManager += sampleDataframe
    
    # Initializing 
    manager.initialize()
    manager.execute()
    manager.finalize()
    

    这样,您将拥有干净且分离的代码。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-09-12
      • 1970-01-01
      • 2020-04-22
      • 1970-01-01
      • 1970-01-01
      • 2016-01-13
      • 1970-01-01
      • 2016-02-26
      相关资源
      最近更新 更多