【问题标题】:Running Faust Agent Synchronously同步运行 Faust Agent
【发布时间】:2020-10-19 12:15:37
【问题描述】:

检查以下代码

@app.agent()
async def process(stream):
    async for value in stream.take(5000, within=5):
        process(value)

代理在 5 秒内异步获取 5000 条记录并进行处理。我不希望代理在前一个处理完成之前再选择 500 万条记录。基本上我想同步运行代理。有什么办法可以做到吗?

【问题讨论】:

  • 只是想知道,当使用 take 方法时,最后一条记录是在浮士德处理的吗? github.com/robinhood/faust/issues/656
  • @Learnis 我在生产中使用take 方法,我还没有遇到任何此类问题。

标签: python-3.x apache-kafka faust


【解决方案1】:

我认为您可以将代理上的 concurrency 设置为 1,这样可以有效地使其同步。

如果您这样做,您可能还会发现修改 topic partitions 很有用,但我不完全了解这两个设置之间的关系(只是想指出一个可能有用的途径)。

【讨论】:

    【解决方案2】:

    我尝试使用以下代码查看worker是否正在执行第二批记录,而第一批的处理尚未完成

    @app.agent()
    async def process(stream):
        async for value in stream.take(5000, within=5):
            print(1)
            await async.sleep(30)
    

    worker 打印 1 并等待 30 秒打印 2。 await 语句将控制权交还给事件循环,但在这种情况下,它等待,这意味着批处理一个接一个地执行。因此这是同步的。

    PS。提交偏移、重新平衡、监控等都是异步操作,由事件循环处理。

    【讨论】:

    • 当您在这里使用 take 方法从 kafka 获取 5000 条记录后,假设由于某些问题,在您处理完 5k 条记录中的 2k 条记录后,worker 立即停机。在这种情况下,重新启动后您将处理那些 2k 记录两次,对吗?你如何处理这种情况
    猜你喜欢
    • 2022-12-01
    • 2021-02-23
    • 2012-11-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-25
    • 1970-01-01
    相关资源
    最近更新 更多