【发布时间】:2020-09-16 16:56:46
【问题描述】:
目前,我正在尝试从 kafka 主题中读取数据,并使用从 kafka 主题中获取的数据异步调用 rest-API。如果 msg 是 Meher,这里 rest-api 会立即给出响应,否则响应将需要 5 秒
卡夫卡数据
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
下面是代码:
app = faust.App(
'faustApp',
broker="kafka://localhost:9092",
value_serializer='raw',
)
app_topic = app.topic('topic_base')
@app.agent(app_topic,concurrency=1)
async def imports_news(articles):
async for article in articles:
val = article.decode('utf-8')
url = 'http://0.0.0.0:5050/' + val
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(data)
if __name__ == '__main__':
app.main()
电流输出:
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
预期输出:
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
预计会在第一时间获得即时响应的所有其余调用的响应,之后应该会出现延迟响应,但目前它正在按顺序工作。
如果我将并发增加到 5,它会给出预期的输出,但在并发 1 的情况下应该使用相同的输出。 不确定,如果我遗漏了什么……有什么帮助吗?
更新1:
我已经用普通的 python asyncIO 尝试过同样的事情。它按预期工作
import asyncio
import aiofiles
import aiohttp
async def get_player(player_name):
url = 'http://0.0.0.0:5050/' + player_name
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.text()
print(data)
loop = asyncio.get_event_loop()
player_args = ["Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
"Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
"Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo"]
loop.run_until_complete(
asyncio.gather(
*(get_player(args) for args in player_args)
)
)
【问题讨论】:
标签: python-3.x apache-kafka python-asyncio faust