【问题标题】:How to work a job queue with kubernetes with scaling如何使用具有扩展性的 Kubernetes 处理作业队列
【发布时间】:2019-09-26 11:28:30
【问题描述】:

我需要一个基于 docker/python worker 的可扩展队列处理。我的想法转向了 Kubernetes。但是,我不确定最好的控制器/服务。

基于 azure 函数,我获得传入的 http 流量,将简单消息添加到存储队列。需要处理这些消息,并将结果反馈到结果队列中。

为了处理这些队列消息,我开发了循环队列并处理这些作业的 Python 代码。每次成功循环后,消息将从源队列中删除,并将结果写入结果队列。一旦队列为空,代码就存在。

所以我创建了一个运行 python 代码的 docker 镜像。如果启动了多个容器,则队列显然会更快地工作。 我还实施了新的 Azure Kubernetes 服务来扩展它。 在刚接触 kubernetes 时,我读到了作业范式,以在作业准备好之前工作队列。我的简单 yaml 模板如下所示:

apiVersion: batch/v1
kind: Job
metadata:
  name: myjob
spec:
  parallelism: 4
  template:
    metadata:
      name: myjob
    spec:
      containers:
      - name: c
        image: repo/image:tag

我现在的问题是,作业无法重新启动。

通常,队列被一些条目填满,然后有一段时间没有任何反应。然后又会出现更大的队列,需要尽快处理。当然,那时我想再次运行这项工作,但这似乎是不可能的。另外,如果队列中没有任何内容,我想将占用空间减少到最低限度。

所以我的问题是,我应该在这种情况下使用什么架构/构造,是否有简单的 yaml 示例?

【问题讨论】:

    标签: python azure docker kubernetes azure-aks


    【解决方案1】:

    这可能是一个“愚蠢/骇人听闻”的答案,但它简单、健壮,而且我已经在生产系统中使用它几个月了。

    我有一个类似的系统,我有一个队列,有时会被清空,有时会被猛烈抨击。我类似地编写了我的队列处理器,它一次处理队列中的一条消息,如果队列为空则终止。它被设置为在 Kubernetes 作业中运行。

    诀窍是这样的:我创建了一个 CronJob 来定期启动一个新的作业实例,并且该作业允许无限并行。如果队列为空,则立即终止(“缩小”)。如果队列被塞满并且最后一个作业还没有完成,另一个实例会启动(“扩展”)。

    无需查询队列和扩展有状态集或任何东西,如果队列为空,则不会消耗任何资源。您可能需要调整 CronJob 间隔以微调它对队列填满的反应速度,但它应该反应良好。

    【讨论】:

    • 你有机会分享示例配置吗? CronJob 如何知道最后一个 Job 是否还没有完成?如果此 CronJob 检查的队列深度太高,如何指定更高的作业并行度?这在概念上似乎比我一直在寻找的其他信息更容易,但我对它的配置方式很感兴趣。
    • 其实很简单。我根本不限制并行性(只是不要在 jobTemplate 规范中设置它),而是在 cronjob 规范中设置concurrencyPolicy: "Allow"。然后设置您想要启动新员工的任何时间表,我将其设置为每 15 分钟一次。没有投票或类似的东西,它不关心最后一个工作是否完成。它只是每 15 分钟启动一个新作业,当队列中没有剩余项目时,它们都会终止。
    • 在您的情况下,一个作业可以处理整个工作量?在我的情况下,我希望有多个工作(最好是扩展他们的数量)并通过从队列中挑选任务来分担工作量。我发现您的回答非常有趣,并且想知道您将如何实现这一目标。我现在拥有的是一个随队列长度扩展的部署,但我想改用 Jobs 和 CronJobs(因为它们是短暂的,但部署会不断重新启动我的工作,即使退出代码为 0)跨度>
    • 通常一个 Job 会在一小时内处理完队列并终止,但如果它被备份并且在下一个 CronJob 计划之前没有完成,CronJob 仍然会创建另一个并行工作的 Job。这样我就不必在上下扩展部署时搞乱了。
    【解决方案2】:

    这是一种常见的模式,有多种方法可以构建解决方案。

    一个常见的解决方案是让一个应用程序有一组工作人员始终轮询您的队列(这可能是您的 python 脚本,但您需要将其设为服务),通常您可能希望使用 Kubernetes Deployment根据您的队列或 CPU 的一些指标,使用 Horizontal Pod Autoscaler

    在您的情况下,您需要将脚本设置为 daemon 并在有任何项目时轮询队列(我假设您已经在处理具有并行性的竞争条件)。然后使用 Kubernetes 部署部署此守护程序,然后您可以根据指标或计划进行纵向扩展和缩减。

    已经有许多不同语言的作业调度程序。一个非常流行的是Airflow,它已经有能力拥有“工人”,但这对于单个 python 脚本来说可能是多余的。

    【讨论】:

    • 假设我们正在使用部署和 HPA 解决方案,其中 HPA 指标是队列长度。你如何防止缩小规模杀死活跃的工人?例如。我们扩大到 10 个工作人员,5 个完成,HPA 正在缩减部署。你如何确保它杀死了 5 名完成工作的工人,而不是那些仍在工作的工人?
    • 通常,您可以使用在容器中定义的 preStop 挂钩和终止宽限期来管理它。更多信息在这里:kubernetes.io/docs/concepts/workloads/pods/pod/…。但是,是的,HPA 没有“终止最旧策略”机制,类似于docs.aws.amazon.com/autoscaling/ec2/userguide/…(AWS 中的 ASG 终止策略)。这可能是一个功能请求:)
    • 有一个功能请求,但它从 2017 年开始开放...github.com/kubernetes/kubernetes/issues/45509
    • 感谢分享,我已经给票加了评论。
    猜你喜欢
    • 1970-01-01
    • 2018-02-23
    • 2018-01-20
    • 2019-01-02
    • 2018-12-29
    • 1970-01-01
    • 2015-10-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多