【问题标题】:Python multiprocessing RemoteManager under a multiprocessing.ProcessPython multiprocessing RemoteManager 下的一个 multiprocessing.Process
【发布时间】:2012-07-16 23:22:23
【问题描述】:

我正在尝试在管理进程下启动数据队列服务器(以便以后可以将其转换为服务),虽然数据队列服务器功能在主进程中工作正常,但在使用 multiprocessing.Process 创建的进程。

dataQueueServer 和 dataQueueClient 代码基于多处理模块文档here 中的代码。

单独运行时,dataQueueServer 运行良好。但是,当在 mpqueue 中使用multiprocessing.Processstart() 运行时,它不起作用(在客户端测试时)。我正在使用 dataQueueClient 来测试这两种情况。

代码在这两种情况下都到达serve_forever,所以我认为服务器正在工作,但在 mpqueue 情况下,有些东西阻止它与客户端通信。

我已将运行serve_forever() 部分的循环放在一个线程下,以便它可以停止。

代码如下:

mpqueue # 这是试图在子进程中生成服务器的“管理器”进程

import time
import multiprocessing
import threading
import dataQueueServer

class Printer():
    def __init__(self):
        self.lock = threading.Lock()
    def tsprint(self, text):
        with self.lock:
            print text

class QueueServer(multiprocessing.Process):
    def __init__(self, name = '', printer = None):
        multiprocessing.Process.__init__(self)
        self.name = name
        self.printer = printer
        self.ml = dataQueueServer.MainLoop(name = 'ml', printer = self.printer)

    def run(self):
        self.printer.tsprint(self.ml)
        self.ml.start()

    def stop(self):
        self.ml.stop()

if __name__ == '__main__':
    printer = Printer()
    qs = QueueServer(name = 'QueueServer', printer =  printer)
    printer.tsprint(qs)
    printer.tsprint('starting')
    qs.start()
    printer.tsprint('started.')
    printer.tsprint('Press Ctrl-C to quit')
    try:
        while True:
            time.sleep(60)
    except KeyboardInterrupt:
        printer.tsprint('\nTrying to exit cleanly...')
        qs.stop()

    printer.tsprint('stopped')

数据队列服务器

import time
import threading

from multiprocessing.managers import BaseManager
from multiprocessing import Queue

HOST = ''
PORT = 50010
AUTHKEY = 'authkey'

## Define some helper functions for use by the main process loop
class Printer():
    def __init__(self):
        self.lock = threading.Lock()
    def tsprint(self, text):
        with self.lock:
            print text



class QueueManager(BaseManager): 
    pass


class MainLoop(threading.Thread):
    """A thread based loop manager, allowing termination signals to be sent
    to the thread"""
    def __init__(self, name = '', printer = None):
        threading.Thread.__init__(self)
        self._stopEvent = threading.Event()
        self.daemon = True
        self.name = name

        if printer is None:
            self.printer = Printer()
        else:
            self.printer = printer

        ## create the queue
        self.queue = Queue()
        ## Add a function to the handler to return the queue to clients
        self.QM = QueueManager

        self.QM.register('get_queue', callable=lambda:self.queue)
        self.queue_manager = self.QM(address=(HOST, PORT), authkey=AUTHKEY)
        self.queue_server = self.queue_manager.get_server()

    def __del__(self):
        self.printer.tsprint( 'closing...')


    def run(self):
        self.printer.tsprint( '{}: started serving'.format(self.name))
        self.queue_server.serve_forever()


    def stop(self):
        self.printer.tsprint ('{}: stopping'.format(self.name))
        self._stopEvent.set()

    def stopped(self):
        return self._stopEvent.isSet()

def start():
    printer = Printer() 
    ml = MainLoop(name = 'ml', printer = printer)
    ml.start()
    return ml

def stop(ml):
    ml.stop()

if __name__ == '__main__':
    ml = start()
    raw_input("\nhit return to stop")
    stop(ml)

还有一个客户:

dataQueueClient

import datetime
from multiprocessing.managers import BaseManager


n = 0
N = 10**n

HOST = ''
PORT = 50010
AUTHKEY = 'authkey'


def now():
    return datetime.datetime.now()

def gen(n, func, *args, **kwargs):
    k = 0
    while k < n:
        yield func(*args, **kwargs)
        k += 1

class QueueManager(BaseManager): 
    pass
QueueManager.register('get_queue')
m = QueueManager(address=(HOST, PORT), authkey=AUTHKEY)
m.connect()
queue = m.get_queue()

def load(msg, q):
    return q.put(msg)

def get(q):
    return q.get()

lgen = gen(N, load, msg = 'hello', q = queue)
t0 = now()
while True:
    try:
        lgen.next()
    except StopIteration:
        break
t1 = now()
print 'loaded %d items in ' % N, t1-t0

t0 = now()
while queue.qsize() > 0:
    queue.get()
t1 = now()
print 'got %d items in ' % N, t1-t0

【问题讨论】:

    标签: python queue multiprocessing


    【解决方案1】:

    所以看起来解决方案很简单:不要使用serve_forever(),而是使用manager.start()

    根据Eli BenderskyBaseManager(和它的扩展版本SyncManager)已经在一个新进程中生成了服务器(并且查看 multiprocessing.managers 代码证实了这一点)。我遇到的问题源于示例中使用的表单,其中服务器是在主进程下启动的。

    我仍然不明白为什么当前示例在子进程下运行时不起作用,但这不再是问题。

    这是管理多个队列服务器的工作代码(并且从 OP 大大简化):

    服务器

    from multiprocessing import Queue
    from multiprocessing.managers import SyncManager
    
    HOST = ''
    PORT0 = 5011
    PORT1 = 5012
    PORT2 = 5013
    AUTHKEY = 'authkey'
    
    name0 = 'qm0'
    name1 = 'qm1'
    name2 = 'qm2'
    
    description = 'Queue Server'
    
    def CreateQueueServer(HOST, PORT, AUTHKEY, name = None, description = None):
        name = name
        description = description
        q = Queue()
    
        class QueueManager(SyncManager):
            pass
    
    
        QueueManager.register('get_queue', callable = lambda: q)
        QueueManager.register('get_name', callable = name)
        QueueManager.register('get_description', callable = description)
        manager = QueueManager(address = (HOST, PORT), authkey = AUTHKEY)
        manager.start() # This actually starts the server
    
        return manager
    
    # Start three queue servers
    qm0 = CreateQueueServer(HOST, PORT0, AUTHKEY, name0, description)
    qm1 = CreateQueueServer(HOST, PORT1, AUTHKEY, name1, description)
    qm2 = CreateQueueServer(HOST, PORT2, AUTHKEY, name2, description)
    
    raw_input("return to end")
    

    客户

    from multiprocessing.managers import SyncManager
    
    HOST = ''
    PORT0 = 5011
    PORT1 = 5012
    PORT2 = 5013
    AUTHKEY = 'authkey'
    
    def QueueServerClient(HOST, PORT, AUTHKEY):
        class QueueManager(SyncManager):
            pass
        QueueManager.register('get_queue')
        QueueManager.register('get_name')
        QueueManager.register('get_description')
        manager = QueueManager(address = (HOST, PORT), authkey = AUTHKEY)
        manager.connect() # This starts the connected client
        return manager
    
    # create three connected managers
    qc0 = QueueServerClient(HOST, PORT0, AUTHKEY)
    qc1 = QueueServerClient(HOST, PORT1, AUTHKEY)
    qc2 = QueueServerClient(HOST, PORT2, AUTHKEY)
    # Get the queue objects from the clients
    q0 = qc0.get_queue()
    q1 = qc1.get_queue()
    q2 = qc2.get_queue()
    # put stuff in the queues
    q0.put('some stuff')
    q1.put('other stuff')
    q2.put({1:123, 2:'abc'})
    # check their sizes
    print 'q0 size', q0.qsize()
    print 'q1 size', q1.qsize()
    print 'q2 size', q2.qsize()
    
    # pull some stuff and print it
    print q0.get()
    print q1.get()
    print q2.get()
    

    添加一个额外的服务器来与正在运行的队列服务器的信息共享一个字典,这样消费者就可以很容易地知道什么是可用的,使用该模型很容易。不过需要注意的一点是,共享字典需要的语法与普通字典略有不同:dictionary[0] = something 不起作用。您需要使用 dictionary.update([(key, value), (otherkey, othervalue)])dictionary.get(key) 语法,它们会传播到连接到此字典的所有其他客户端。

    【讨论】:

    • 当我在服务器中将 HOST 修改为 '192.168.aa.bb' 时遇到问题,因为客户端位于 '192.168.aa.bb' 中。它显示错误:[Errno 99] Cannot assign requested addressqm0 = CreateQueueServer(HOST, PORT0, AUTHKEY, name0, description)我不知道为什么,我只是想让服务器向客户端写一些东西
    猜你喜欢
    • 1970-01-01
    • 2017-12-30
    • 1970-01-01
    • 2021-03-30
    • 2012-01-19
    • 1970-01-01
    • 1970-01-01
    • 2023-03-20
    • 1970-01-01
    相关资源
    最近更新 更多