【问题标题】:How to use asyncio module call nameko service concurrently如何同时使用 asyncio 模块调用 nameko 服务
【发布时间】:2019-06-19 22:04:40
【问题描述】:

我写了一个这样的异步程序。一个永远的运行循环同时启动 4 个事件。每个事件都将运行rpc 服务。在nameko服务中,我用time.sleep(10)实现服务。

我很困惑为什么服务每10 秒完成一次。我认为服务应该同时完成。我怎样才能让工作同时完成?

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def job(x):
    try:
        with ClusterRpcProxy(CONFIG) as rpc:
            res = rpc.helloworldService.helloworld(x)
            print(res)
    except Exception as e:
        print(f"{e}")


async def do_sleep(x, queue):
        try:
             await job(x)
             queue.put("ok")
        except Exception as e:
            print(f"{e}")


def consumer():
    asyncio.run_coroutine_threadsafe(do_sleep('10', queue), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep('11', queue), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep('12', queue), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep('13', queue), new_loop)


if __name__ == '__main__':
    print(time.ctime())
    new_loop = asyncio.new_event_loop()

    loop_thread = Thread(target=start_loop, args=(new_loop,))
    loop_thread.setDaemon(True)
    loop_thread.start()

    CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}
    queue = Queue()
    sema = asyncio.Semaphore(2)

    consumer_thread = Thread(target=consumer)
    consumer_thread.setDaemon(True)
    consumer_thread.start()

    while True:
        msg = queue.get()
        print("current:", time.ctime())

namekorpc 服务是:

class HelloWorld:
    name = 'helloworldService'

    @rpc
    def helloworld(self,str):
        time.sleep(10)
        return 'hello_'+str

输出如下:

hello_10
current: Sat Jan 26 13:04:57 2019
hello_11
current: Sat Jan 26 13:05:07 2019
hello_12
current: Sat Jan 26 13:05:17 2019
hello_13
current: Sat Jan 26 13:05:28 2019

【问题讨论】:

  • 您的job 函数不包含await,因此它不能与其他任何东西并行运行。您需要切换到支持异步调用的 RPC 库。

标签: python python-3.x rpc python-asyncio nameko


【解决方案1】:

您必须使用可等待的睡眠而不是不可等待的time.sleep()。所以你的nameko RPC 服务将如下所示:

import asyncio

class HelloWorld:
    name = 'helloworldService'

    @rpc
    async def helloworld(self,str):  # Note
        await asyncio.sleep(10)  # Note
        return 'hello_'+str

还有你的服务器代码:

async def job(x):
    try:
        with ClusterRpcProxy(CONFIG) as rpc:
            res = await rpc.helloworldService.helloworld(x)  # Note
            print(res)
    except Exception as e:
        print(f"{e}")

[注意]

  • 但是你的 RPC 库也应该由asyncio 实现。
  • 这是一个异步 asyncio RPC 库 (aiorpc)。

【讨论】:

  • @user10970428 用aiorpc 代替nameko 怎么样?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-11-25
  • 2020-02-24
  • 2020-07-03
  • 2018-05-29
  • 2019-05-31
  • 2020-04-19
  • 1970-01-01
相关资源
最近更新 更多