【问题标题】:Python multiprocessing: Start/stop processes on serverPython 多处理:在服务器上启动/停止进程
【发布时间】:2014-02-01 00:12:55
【问题描述】:

我正在使用 Python 构建一个算法交易平台。多种算法在每天 09:30 至 16:00 期间监控市场并相应地执行交易。

我正在寻找的是从客户端任意启动和停止算法。因此,我希望有一个使用multiprocessing 运行的服务器脚本和一个可以在任何给定时间启动/停止/列出算法(应该在单独的process 中运行)的客户端。

有什么可以做到这一点的例子吗?大多数在线示例都是针对队列服务器的,这似乎不适合我的问题。

编辑:

我正在尝试使用包multiprocessing 来解决这个问题。使用队列的想法对我来说似乎是错误的,因为我知道任意数量的进程将运行一整天或至少直到我说停止。我不想运行一个简短的脚本,让工作人员在完成前一个作业后从队列中使用下一个作业。实际上,我正在考虑使用Manager 的服务器脚本,它将永远运行,并在请求时在单独的进程/线程中启动新脚本。但是,我希望能够向进程发送停止信号以杀死它。我确实有一种感觉,我这样做有点倒退:-) 我所拥有的是:

server.py:

import multiprocessing as mp
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from time import strftime


class Server(object):
    def __init__(self, port=50000, authkey=''):
        self.processes = {}
        self._authkey = authkey
        self.port = port
        self.server = None
        self.running = False
        BaseManager.register('get_process', callable=lambda: self)


    def start_server(self):
        manager = BaseManager(address=('', self.port), authkey=self._authkey)
        self.server = manager.get_server()
        try:
            self._logmessage("Server started")
            self.running = True
            self.server.serve_forever()
        except (KeyboardInterrupt, SystemExit):
            self.shutdown()

    def start_process(self, mod, fcn, *args, **kwargs):
        mod = __import__(mod, globals(), locals(), ['object'], -1)
        key = "{0}.{1}".format(mod, fcn)
        assert not key in self.processes, \
            "Process named '%s' already exists" % key
        p = Process(target=getattr(mod, fcn), name=mod, args=(None, ), kwargs=kwargs)
        self._logmessage("Process '%s' started" % key)
        p.start()
        # p.join()
        self.processes[key] = p

    def stop_process(self, key):
        self.processes[key].terminate()
        del self.processes[key]

    def get_processes(self):
        return self.processes.keys()

    def shutdown(self):
        for child in mp.active_children():
            child.terminate()
        self.server.shutdown()
        self.running = False
        print "Shutting down"

    def _logmessage(self, msg):
        print "%s: %s" % (strftime('%Y-%m-%d %H:%M:%S'), msg)


if __name__ == '__main__':
    server = Server(authkey='abc')
    try:
        server.start_server()
    except (KeyboardInterrupt, SystemExit):
        server.shutdown()

client.py:

from multiprocessing.managers import BaseManager
import time


class Client(object):
    def __init__(self, host='', port=50000, authkey=''):
        self.host = host
        self.port = port
        self.manager = None
        self.process = None
        self._type_id = 'get_process'
        self._authkey = authkey
        self.manager = BaseManager(address=(self.host, self.port), authkey=self._authkey)
        BaseManager.register(self._type_id)

    def connect(self):
        try:
            self.manager.connect()
            self._logmessage("Connected to server")
        except:
            self._logmessage("Could not connect to server")
        self.process = getattr(self.manager, self._type_id)()

    def start_process(self, mod, fcn):
        self.process.start_process(mod, fcn)
        self._logmessage("Process '%s' started" % fcn)

    def list_processes(self):
        print self.process.get_processes()

    @property
    def connected(self):
        return self.manager._state.value == self.manager._state.STARTED

    def _logmessage(self, msg):
        print "%s: %s" % (time.strftime('%Y-%m-%d %H:%M:%S'), msg)


def test(data):
    while True:
        print time.time()
        time.sleep(1.)


if __name__ == '__main__':
    from algotrading.server.process_client import Client
    client = Client(authkey='abc')
    client.connect()
    client.start_process("algotrading.server.process_client", "test")
    client.list_processes()

【问题讨论】:

  • 你能分享一个来源吗?

标签: python multiprocessing algorithmic-trading


【解决方案1】:

您可以实现一个套接字服务器,它监听客户端并启动线程来执行算法。

我认为 RPC 将是最简单的解决方案。

一些灵感:What is the current choice for doing RPC in Python?

【讨论】:

  • 抱歉,我的问题可能没有表达清楚。我希望使用包multiprocessing 来做到这一点。我想我可以使用Manager 来处理这个问题。
【解决方案2】:

查看Supervisord,它允许远程管理进程,以及自动启动/重启配置。

根据您的可扩展性和灾难恢复需求,您可能正在考虑将您的“监控/交易流程”分布在运行的多台服务器上。虽然 supervisord 实际上只设计用于管理一台机器,但您可以构建一个管理器应用程序,通过它的 xml-rpc 接口协调多个服务器,每个服务器运行 supervisord。

Cron 或 Celery 可用于您的日常启动/停止计划。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-04-27
    • 2022-09-23
    • 1970-01-01
    • 2011-10-26
    • 1970-01-01
    • 1970-01-01
    • 2015-01-14
    相关资源
    最近更新 更多