【问题标题】:Integration Testing Multiple Celery Workers and a DB Backed Django API集成测试多个 Celery Worker 和数据库支持的 Django API
【发布时间】:2014-06-19 07:51:38
【问题描述】:

我正在使用具有多个 celery 工作人员的面向软件的架构(我们称他们为 worker1worker2worker3)。所有三个工作人员都是独立的实体(即,独立的代码库、独立的仓库、独立的 celery 实例、独立的机器),它们都没有连接到 Django 应用程序。

与这三个工作人员中的每一个进行通信的是一个基于 Django、MySQL 支持的 RESTful API。

在开发过程中,这些服务都在一个 vagrant box 上,每一个都作为一个单独的机器运行在一个单独的端口上。我们为所有 Celery 任务提供了一个 RabbitMQ 代理。

通过这些服务的典型路径可能如下所示:worker1 从设备获取消息,进行一些处理,在worker2 上排队任务,然后进行进一步处理并向@987654326 发出 POST @,写入 MySQL 数据库并在 worker3 上触发一个任务,该任务执行一些其他处理并对 API 进行另一个 POST,从而导致 MySQL 写入。

服务之间的通信很好,但每次我们对任何服务进行更改时都测试此流程非常烦人。我真的很想进行一些完整的集成测试(即,从发送到worker1 的消息开始并遍历整个链),但我不确定从哪里开始。我面临的主要问题是:

如果我在worker1 上排队,我怎么可能知道整个流程何时结束?当我不知道结果是否已经到达时,如何对结果做出合理的断言?

如何处理数据库设置/拆除?我想在每次测试结束时删除测试期间所做的所有条目,但是如果我从 Django 应用程序外部开始测试,我不确定如何有效地清除它。每次测试后手动删除它并重新创建它似乎开销太大。

【问题讨论】:

    标签: python django integration celery


    【解决方案1】:

    Celery 允许同步运行任务,所以第一步是:将整个流程划分为单独的任务、伪造请求和断言结果:

    原流程:

    device --- worker1 --- worker2 --- django --- worker3 --- django
    

    一级集成测试:

    1.      |- worker1 -|
    2.                  |- worker2 -|
    3.                              |- django -|
    4.                                         |- worker3 -|
    5.                                                     |- django -|
    

    为每个测试创建假请求或同步调用并断言结果。将这些测试放在相应的存储库中。例如,在对 worker1 的测试中,您可以模拟 worker2 并测试它是否已使用正确的参数调用。然后,在另一个测试中,您将调用 worker2 和模拟请求来检查它是否调用了正确的 API。以此类推。

    测试整个流程会很困难,因为所有任务都是独立的实体。我现在想出的唯一方法是对 worker1 进行一次虚假调用,设置合理的超时时间并等待数据库中的最终结果。这种测试只告诉你它是否有效。它不会告诉你,问题出在哪里。

    【讨论】:

      【解决方案2】:

      要使用完整设置,您可以设置 Celery 结果后端。 有关基础知识,请参阅 Celery 'next steps' 文档。

      worker1 然后可以将其传递给worker2 的任务句柄报告给worker2worker2 返回的结果将是它传递给 worker3 的任务 ID。 worker3 返回的结果意味着整个序列已经完成,您可以检查结果。 结果还可以立即报告这些结果的有趣部分,以使检查更容易。

      这在 Celery 中可能看起来有点这样:

      worker1_result = mytask.delay(someargs)  # executed by worker1
      worker2_result = worker1_result.get()  # waits for worker1 to finish
      worker3_result = worker2_result.get()  # waits for worker2 to finish
      outcome = worker3_result.get()  # waits for worker3 to finish
      

      (细节可能需要有所不同;我自己还没有使用过。我不确定任务结果是否可序列化,因此它们是否适合作为任务函数返回值。)

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-11-26
        • 2011-07-24
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2010-09-21
        • 1970-01-01
        • 2015-07-14
        相关资源
        最近更新 更多