【发布时间】:2020-10-19 12:15:37
【问题描述】:
检查以下代码
@app.agent()
async def process(stream):
async for value in stream.take(5000, within=5):
process(value)
代理在 5 秒内异步获取 5000 条记录并进行处理。我不希望代理在前一个处理完成之前再选择 500 万条记录。基本上我想同步运行代理。有什么办法可以做到吗?
【问题讨论】:
-
只是想知道,当使用 take 方法时,最后一条记录是在浮士德处理的吗? github.com/robinhood/faust/issues/656
-
@Learnis 我在生产中使用
take方法,我还没有遇到任何此类问题。
标签: python-3.x apache-kafka faust