【问题标题】:Multiple Async HTTP connections to Tornado ServerTornado 服务器的多个异步 HTTP 连接
【发布时间】:2018-11-07 08:32:18
【问题描述】:

我有一个要同步的龙卷风服务器。我有一个客户端,它同时向服务器发出异步请求。它每 5 秒用一次心跳 ping 服务器,其次,它会尽可能地对作业发出 GET 请求。

在服务器端,有一个包含作业的线程安全队列。如果队列为空,它将阻塞 20 秒。我希望它保持连接并阻塞 20 秒,当它返回时,它会向客户端写入“No job”。一旦有作业可用,它应该立即将其写入客户端,因为 queue.get() 会返回。我希望在此请求被阻止时,心跳继续在后台发生。这里我从同一个客户端向服务器发出两个异步请求。

这是我构建的一个示例项目,它模拟了我的问题。

服务器:

import tornado.ioloop
import tornado.web
from queue import Queue
from tornado import gen

q = Queue()


class HeartBeatHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def post(self):
        print("Heart beat")


class JobHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        print("Job")
        try:
            job = yield q.get(block=True, timeout=20)
            self.write(job)
        except Exception as e:
            self.write("No job")


def make_app():
    return tornado.web.Application([
        (r"/heartbeat", HeartBeatHandler),
        (r"/job", JobHandler),
    ])


if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    try:
        tornado.ioloop.IOLoop.current().start()
    except KeyboardInterrupt:
        tornado.ioloop.IOLoop.current().stop()

客户:

import asyncio
from tornado import httpclient, gen


@gen.coroutine
def heartbeat_routine():
    while True:
        http_client = httpclient.AsyncHTTPClient()
        heartbeat_request = httpclient.HTTPRequest("http://{}/heartbeat".format("127.0.0.1:8888"), method="POST",
                                                   body="")
        try:
            yield http_client.fetch(heartbeat_request)
            yield asyncio.sleep(5)
        except httpclient.HTTPError as e:
            print("Heartbeat failed!\nError: {}".format(str(e)))

        http_client.close()


@gen.coroutine
def worker_routine():
    while True:
        http_client = httpclient.AsyncHTTPClient(defaults=dict(request_timeout=180))
        job_request = httpclient.HTTPRequest("http://{}/job".format("127.0.0.1:8888"), method="GET")
        try:
            response = yield http_client.fetch(job_request)
            print(response.body)
        except httpclient.HTTPError as e:
            print("Heartbeat failed!\nError: {}".format(str(e)))

        http_client.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(heartbeat_routine())
    asyncio.ensure_future(worker_routine())
    loop.run_forever()

问题:

  1. 问题是心跳也会阻塞 20 秒 而 queue.get() 阻塞。这是我不想要的。
  2. 正如您在我的客户端中看到的,我将请求超时设置为 180 秒。但是那个 似乎从来没有与龙卷风一起工作。如果将 queue.get() 超时时间增加到 20 以上 秒,它返回错误代码说请求超时。

【问题讨论】:

    标签: python client-server tornado python-asyncio sendasynchronousrequest


    【解决方案1】:
    1. 如果使用线程安全队列,则必须使用不使用来自 IOLoop 线程的阻塞操作。相反,在线程池中运行它们:

      job = yield IOLoop.current().run_in_executor(None, lambda: q.get(block=True, timeout=20))
      

      或者,您可以使用 Tornado 的异步(但线程不安全)队列,并在需要与来自另一个线程的队列交互时使用 IOLoop.add_callback

    2. AsyncHTTPClient 构造函数有一些魔力,它会尽可能地尝试共享现有实例,但这意味着构造函数参数仅在第一次有效。 worker_routine 正在获取heartbeat_routine 创建的默认实例。添加force_instance=True 以确保您在worker_routine 中获得新客户(完成后致电.close()

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-06-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多