【问题标题】:Stop Locust When Specified Number Of User Tasks Complete完成指定数量的用户任务时停止 Locust
【发布时间】:2020-05-06 23:36:04
【问题描述】:

在我的场景中,我在没有 Web UI 的情况下运行 Locust。我使用的命令是

locust -f my_locust_file --no_web -c 20 -r 4     # as a hack I add -t 10s

这相当于每秒孵化 4 个用户,总共孵化 20 个用户。

我的目标是让 20 个 locust 用户中的每一个执行一个任务,我希望 locust 运行在最后一个用户(第 20 个用户)的任务完成时完成并退出。收集的统计信息应仅包括与每个任务相关的响应时间。

在这个场景中,已经确定了 5 个可以与用户随机关联的任务(用户场景):

class UserScenarios(TaskSet):
    tasks = [Sequence_One, ServerSequence, Components_Sequence, Embedded_Sequence, Connectivity_Sequence]

class MyLocust(HttpLocust):
    def __init__(self):
        super().__init__()
        MyLocust.counter += 1
        print(f"Counter = {MyLocust.counter}")

    counter = 0
    task_set = UserScenarios
    wait_time = between(1, 1)
    host = 'https://*****.com'

每个任务(用户场景)对应于应按顺序加载的 3 或 4 个页面的不同序列。由 2 页组成的经过清理和简化的序列示例是:

class Sequence_One(TaskSequence):

    @seq_task(1)
    def get_task1(self):
        response = self.client.get(url='https://****',
                                   name='https://****',
                                   timeout=30,
                                   allow_redirects=False,
                                   headers={...})   

    @seq_task(2)
    def get_task2(self):
        response = self.client.get(url='https://****',
                                   name='https://****',
                                   timeout=30,
                                   allow_redirects=False,
                                   headers={...})  

有没有办法在第 20 个(第 n 个)用户任务完成后停止测试?例如,如果每个任务访问 4 个页面,我希望在发出 20*4 = 80 个页面请求后终止测试。事实上,作为此测试的一部分,总共只应发出 80 个页面请求。

我对此测试的经验是,在最后一个用户任务完成后,页面请求会继续发出,直到我手动停止测试或使用比任务实际需要完成的时间长一点的时间限制。

【问题讨论】:

    标签: locust


    【解决方案1】:

    这实际上是可能的!虽然没有记录,但至少在我能找到的任何地方,停止蝗虫可以通过调用来实现 提高 StopLocust()

    在提供的场景中,您的每个任务都是一个 TaskSequence 类。为了在执行单个任务后退出 Locust,您需要向调用 StopLocust() 的整个类添加另一个步骤(函数)。我在下面提供了一个示例 -

    class Scenario(TaskSequence):
    
        @seq_task(1)
        def task_1(self):
            self.client.get("/going_somewhere")
    
        @seq_task(2)
        def task_2(self):
            self.client.get("/going_somewhere_else")
    
        @seq_task(3)
        def done(self):
            raise StopLocust()
    

    默认情况下,Locust 被设置为永不结束,并且会不断地从提供给它的可用任务集中挑选一个任务。提供 raise StopLocust() 将告诉当前用户 (Locust) 在到达当前任务结束时停止并且不再选择任务。

    【讨论】:

    • 效果很好......我完全错过了添加序列任务来调用 StopLocust 的概念......实际上我现在也没有关于 StopLocust 的事情。了解这些任务的工作原理可以解决我有疑问的几个场景。感谢您的回复和回答!
    • self.interrupt() 不也做同样的事情吗?
    • Locust 目前正在重命名很多东西,包括StopLocust into StopUserTaskSequence into SequentialTaskSet
    • @SohamDongargaonkar interrupt 与我的理解类似,因为它在许多情况下会停止执行。然而,它留下了可以为给定用户安排更多任务的可能性,如果提出StopUser,这似乎是不可能的。
    • 看起来 StopLocust 已在 locust 1.2 中删除,但您可以使用 locust.exception.StopUser
    【解决方案2】:

    我使用 locust 1.3+ StopLocust() 已弃用

    我的框架为每个用户创建了一个包含凭据和支持变量的元组列表。我已将我所有的用户凭据、令牌、支持文件名等存储在这些元组中作为列表的一部分。 (其实是在locust启动前自动完成的)

    我在 locustfile 中导入该列表

    # creds is created before running locust file and can be stored outside or part of locust # file
    creds = [('demo_user1', 'pass1', 'lnla'),
             ('demo_user2', 'pass2', 'taam9'),
             ('demo_user3', 'pass3', 'wevee'),
             ('demo_user4', 'pass4', 'avwew')]
    
    class RegisteredUser(SequentialTaskSet)
    
        def on_start(self):
            self.credentials = creds.pop()
    
        @task
        def task_one_name(self):
            task_one_commands
    
        @task
        def task_two_name(self):
            task_two_commands
    
        @task
        def stop(self):
            if len(creds) == 0:
                self.user.environment.reached_end = True
                self.user.environment.runner.quit()
    
    
    class ApiUser(HttpUser):
        tasks = [RegisteredUser]
        host = 'hosturl'
    

    我在任务中使用 self.credentials。 我在课堂上创建了 stop 函数。

    另外,观察 RegisteredUser 继承自 SequentialTask​​Set 以按顺序运行所有任务。

    【讨论】:

      【解决方案3】:

      使用

      提高退出()

      如果您想确定命中,请创建一个顺序任务集并 raise exit() raise exit 将停止该用户

      #这是一种解决方法,因为没有其他方法对我有用。

      【讨论】:

      • 嗨,这将阻止整个蝗虫。有没有办法只停止当前用户?
      【解决方案4】:

      headless模式下,如果想停止整个locust服务,可以使用os.kill

      class UserTasks(TaskSet):
          @task
          def test_api(self):
              gobal FINISHED_USER, FINISHED_REQUEST
              with self.user.client.post(f"/{api_type}", json=<my data>, headers=<my headers>, catch_response=True) as response:
                  # do something
                  FINISHED_REQUEST += 1
              if FINISHED_REQUEST == schedule_num:
                  # this user finish all its task, will go to on_stop
                  self.user._state = "stopping"
                  FINISHED_USER += 1
          
          def on_stop(self):
              global FINISHED_USER
              # do something
              # make sure its the last user who finishes its task
              if FINISHED_USER == <your user num>:
                  os.kill(<your pid>, signal.SIGTERM)
      
      class WebsiteUser(HttpUser):
          """
          User class that does requests to the locust web server running on localhost
          """
          host = "http://{}".format(<host>)
          wait_time = constant_throughput(1)
          tasks = [UserTasks]
      
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2014-08-20
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-12-12
        相关资源
        最近更新 更多