【问题标题】:Python Tornado - How to create background process?Python Tornado - 如何创建后台进程?
【发布时间】:2015-08-21 00:47:42
【问题描述】:

是否可以让 Python Tornado 运行一些较长的后台进程,但同时它也为所有的处理程序提供服务?

我有一个 Tornado Webapp,它为一些网页提供服务。但我也有一个消息队列,我希望 Tornado 作为订阅者轮询消息队列。这可以在 Tornado 中完成吗?

我搜索了用户指南,似乎有一个叫做periodic_call_back 的东西可以在ioloop 中使用。听起来我可以使用读取消息队列的回调函数。但是,有没有办法创建一个永不停止的协程?

感谢任何帮助,谢谢!

【问题讨论】:

  • 当您说进程时,您实际上是指不同的操作系统进程,还是只是在后台运行而不会阻塞事件循环的函数?
  • 查看this question 了解运行后台任务的正确方法。如果您希望任务无限运行,只需将其放入while True: 循环中即可。只需确保当任务等待从消息队列中消费时,您使用非阻塞调用来完成。如果您使用的是第三方队列库,您可能需要找到一个tornado 友好的版本。
  • @user1157751 只要读取消息队列的函数实际上是作为适当的协程实现的(这意味着它不会进行任何阻塞 I/O 调用),那么可以。但是,您不能只使用任何旧功能。
  • @user1157751 你不能只将函数包装在协程中。无论您用于从队列中读取什么库,都必须使用异步 I/O 实际实现。你用的是什么消息队列库?
  • @user1157751 不,没那么简单。不过,instructions on the PyZMQ site 记录了如何使它与龙卷风一起工作。

标签: python tornado


【解决方案1】:

从零 MQ 读取:

  1. 安装 Zero-MQ Python 库
  2. 在 application.listen() 之前安装 IOLoop
  3. 使用执行器(对于python2,您可以从python3安装执行器库)执行消息队列侦听器,该侦听器设置tornado侦听消息队列,然后在收到数据时使用回调。

示例(main.py):

# Import tornado libraries
import tornado.ioloop
import tornado.web
# Import URL mappings
from url import application
# Import zeroMQ libraries
from zmq.eventloop import ioloop
# Import zeroMQ.py functions
from zeroMQ import startListenToMessageQueue
# Import zeroMQ settings
import zeroMQ_settings
# Import our executor
import executors
# Import our db_settings
import db_settings

# main.py is the main access point of the tornado app, to run the application, just run "python main.py"

# What this will do is listen to port 8888, and then we can access the app using
# http://localhost:8888 on any browser, or using python requests library
if __name__ == "__main__":
    # Install PyZMQ's IOLoop
    ioloop.install()

    # Set the application to listen to port 8888
    application.listen(8888)

    # Get the current IOLoop
    currentIOLoop = tornado.ioloop.IOLoop.current()

    # Execute ZeroMQ Subscriber for our topics
    executors.executor.submit(startListenToMessageQueue(zeroMQ_settings.server_subscriber_ports,
                                                        zeroMQ_settings.server_subscriber_IP,
                                                        zeroMQ_settings.server_subscribe_list))

    # Test if the connection to our database is successful before we start the IOLoop
    db_settings.testForDatabase(db_settings.database)

    # Start the IOLoop
    currentIOLoop.start()

示例(zeroMQ.py):

# Import our executor
import executors
# Import zeroMQ libraries
import zmq
from zmq.eventloop import ioloop, zmqstream
# Import db functions to process the message
import db

# zeroMQ.py deals with the communication between a zero message queue

def startListenToMessageQueue(subscribe_ports, subscribe_IP, subscribe_topic):
    # Usage:
    #       This function starts the subscriber for our application that will listen to the
    #       address and ports specified in the zeroMQ_settings.py, it will spawn a callback when we
    #       received anything relevant to our topic.
    # Arguments:
    #       None
    # Return:
    #       None

    # Get zmq context
    context = zmq.Context()

    # Get the context socket
    socket_sub = context.socket(zmq.SUB)

    # Connect to multiple subscriber ports
    for ports in subscribe_ports:
        socket_sub.connect("tcp://"+str(subscribe_IP)+":"+str(ports))

    # Subscribe to our relevant topics
    for topic in subscribe_topic:
        socket_sub.setsockopt(zmq.SUBSCRIBE, topic)

    # Setup ZMQ Stream with our socket
    stream_sub = zmqstream.ZMQStream(socket_sub)

    # When we recieve our data, we will process the data by using a callback
    stream_sub.on_recv(processMessage)

    # Print the Information to Console
    print "Connected to publisher with IP:" + \
          str(subscribe_IP) + ", Port" + str(subscribe_ports) + ", Topic:" + str(subscribe_topic)

def processMessage(message):
    # Usage:
    #       This function processes the data using a callback format. The on_recv will call this function
    #       and populate the message variable with the data that we recieved through the message queue
    # Arguments:
    #       message: a string containing the data that we recieved from the message queue
    # Return:
    #       None

    # Process the message with an executor, and use the addData function in our db to process the message
    executors.executor.submit(db.addData, message)

示例(executors.py):

# Import futures library
from concurrent import futures

# executors.py will create our threadpools, and this can be shared around different python files
# which will not re-create 10 threadpools when we call it.
# we can a handful of executors for running synchronous tasks

# Create a 10 thread threadpool that we can use to call any synchronous/blocking functions
executor = futures.ThreadPoolExecutor(10)

示例(zeroMQ_settings.py):

# zeroMQ_settings.py keep the settings for zeroMQ, for example port, IP, and topics that
# we need to subscribe

# Set the Port to 5558
server_subscriber_ports = ["5556", "5558"]

# Set IP to localhost
server_subscriber_IP = "localhost"

# Set Message to Subscribe: metrics.dat
server_subscriber_topic_metrics = "metrics.dat"

# Set Message to Subscribe: test-010
server_subscribe_topics_test_010 = "test-010"

# List of Subscriptions
server_subscribe_list = [server_subscriber_topic_metrics, server_subscribe_topics_test_010]

非常感谢@dano

【讨论】:

    猜你喜欢
    • 2016-09-12
    • 2019-08-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多