【问题标题】:FAUST Asynchronous kafka message processing concurrency is not workingFAUST 异步kafka消息处理并发不起作用
【发布时间】:2020-09-16 16:56:46
【问题描述】:

目前,我正在尝试从 kafka 主题中读取数据,并使用从 kafka 主题中获取的数据异步调用 rest-API。如果 msg 是 Meher,这里 rest-api 会立即给出响应,否则响应将需要 5 秒

卡夫卡数据

Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher

下面是代码:

app = faust.App(
    'faustApp',
    broker="kafka://localhost:9092",
    value_serializer='raw',
)

app_topic = app.topic('topic_base')
@app.agent(app_topic,concurrency=1)
async def imports_news(articles):
    async for article in articles:
        val = article.decode('utf-8')
        url = 'http://0.0.0.0:5050/' + val
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
            print(data)
if __name__ == '__main__':
    app.main()

电流输出:

Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!

预期输出:

Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!

预计会在第一时间获得即时响应的所有其余调用的响应,之后应该会出现延迟响应,但目前它正在按顺序工作。

如果我将并发增加到 5,它会给出预期的输出,但在并发 1 的情况下应该使用相同的输出。 不确定,如果我遗漏了什么……有什么帮助吗?

更新1:

我已经用普通的 python asyncIO 尝试过同样的事情。它按预期工作

import asyncio
import aiofiles
import aiohttp

async def get_player(player_name):
    url = 'http://0.0.0.0:5050/' + player_name
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            data = await resp.text()
    print(data)


loop = asyncio.get_event_loop()
player_args = ["Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
               "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
               "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo"]
loop.run_until_complete(
    asyncio.gather(
        *(get_player(args) for args in player_args)
    )
)

【问题讨论】:

    标签: python-3.x apache-kafka python-asyncio faust


    【解决方案1】:

    来自浮士德文档https://faust.readthedocs.io/en/latest/userguide/agents.html#id5 似乎每个代理实例一次处理流的一个元素。 流迭代不会在其可用元素上并行化,但单个代理实例将按顺序一个一个地处理流元素。

    如果您在处理流的元素时等待某事,代理实例将不会移动到下一个元素(如果可用),直到该元素的处理完成。等待操作不会“解锁”代理,将其移动到下一个流元素,然后在第一次等待完成后恢复对第一个元素的处理。

    另一方面,如果您设置 concurrency=5,则您有 5 个实例可以从流中获取项目并同时并行处理它们。

    Asyncio.gather 之所以有效,是因为协程被包装到任务中并一起并发运行,等待它们的结果。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-03-24
      • 2021-09-08
      • 2019-01-24
      • 1970-01-01
      • 1970-01-01
      • 2018-07-30
      • 1970-01-01
      相关资源
      最近更新 更多