【问题标题】:Kubernetes - processing an unlimited number of work-itemsKubernetes - 处理无限数量的工作项
【发布时间】:2018-09-09 03:15:27
【问题描述】:

我需要从工作队列中获取一个工作项,然后依次运行一系列容器来处理每个工作项。这可以使用 initContainers (https://stackoverflow.com/a/46880653/94078) 来完成

重新启动流程以获取下一个工作项的推荐方法是什么?

  • 作业看起来很理想,但似乎不支持无限/无限数量的完成。
  • 使用单个 Pod 不起作用,因为 initContainers 没有重新启动 (https://github.com/kubernetes/kubernetes/issues/52345)。
  • 我希望避免像 argo 或 brigade 这样的系统的维护/学习开销。

谢谢!

【问题讨论】:

    标签: kubernetes


    【解决方案1】:

    作业应该用于处理工作队列。使用工作队列时,您应该不设置.spec.comletions(或将其设置为null)。在这种情况下,Pod 将继续创建,直到其中一个 Pod 成功退出。故意以失败状态从(主)容器退出有点尴尬,但这是规范。无论此设置如何,您都可以根据自己的喜好设置.spec.parallelism;我已将其设置为 1,因为您似乎不需要任何并行性。

    在您的问题中,如果工作队列为空,您没有指定要做什么,因此我将提供两种解决方案,一种是您想等待新项目(无限),另一种是如果您想结束工作,如果工作队列变空(有限但不确定的项目数)。

    这两个示例都使用了 redis,但您可以将此模式应用到您最喜欢的队列中。请注意,从队列中弹出项目的部分是不安全的;如果您的 Pod 在弹出项目后由于某种原因死亡,则该项目将保持未处理或未完全处理。请参阅reliable-queue pattern 以获得正确的解决方案。

    为了在每个工作项上执行顺序步骤,我使用了init containers。请注意,这确实是一个原始解决方案,但如果您不想使用某些框架来实现适当的管道,则选择有限。

    有一个asciinema 如果有人想在不部署 redis 等的情况下看到它的工作。

    Redis

    要对此进行测试,您至少需要创建一个 redis Pod 和一个服务。我正在使用来自fine parallel processing work queue 的示例。你可以部署它:

    kubectl apply -f https://rawgit.com/kubernetes/website/master/docs/tasks/job/fine-parallel-processing-work-queue/redis-pod.yaml
    kubectl apply -f https://rawgit.com/kubernetes/website/master/docs/tasks/job/fine-parallel-processing-work-queue/redis-service.yaml
    

    此解决方案的其余部分要求您在与 Job 相同的命名空间中拥有一个服务名称 redis,并且它不需要身份验证和一个名为 redis-master 的 Pod。

    插入项目

    要在工作队列中插入一些项目,请使用此命令(您需要 bash 才能工作):

    echo -ne "rpush job "{1..10}"\n" | kubectl exec -it redis-master -- redis-cli
    

    无限版

    如果队列为空,此版本会等待,因此它永远不会完成。

    apiVersion: batch/v1
    kind: Job
    metadata:
      name: primitive-pipeline-infinite
    spec:
      parallelism: 1
      completions: null
      template:
        metadata:
          name: primitive-pipeline-infinite
        spec:
          volumes: [{name: shared, emptyDir: {}}]
          initContainers:
          - name: pop-from-queue-unsafe
            image: redis
            command: ["sh","-c","redis-cli -h redis blpop job 0 >/shared/item.txt"]
            volumeMounts: [{name: shared, mountPath: /shared}]
          - name: step-1
            image: busybox
            command: ["sh","-c","echo step-1 working on `cat /shared/item.txt` ...; sleep 5"]
            volumeMounts: [{name: shared, mountPath: /shared}]
          - name: step-2
            image: busybox
            command: ["sh","-c","echo step-2 working on `cat /shared/item.txt` ...; sleep 5"]
            volumeMounts: [{name: shared, mountPath: /shared}]
          - name: step-3
            image: busybox
            command: ["sh","-c","echo step-3 working on `cat /shared/item.txt` ...; sleep 5"]
            volumeMounts: [{name: shared, mountPath: /shared}]
          containers:
          - name: done
            image: busybox
            command: ["sh","-c","echo all done with `cat /shared/item.txt`; sleep 1; exit 1"]
            volumeMounts: [{name: shared, mountPath: /shared}]
          restartPolicy: Never
    

    有限版本

    如果队列为空,此版本会停止作业。请注意,pop init 容器检查队列是否为空,并且所有后续的 init 容器如果确实为空,则主容器立即退出 - 这是向 Kubernetes 发出 Job 的信号的机制已完成,无需为其创建新的 Pod。

    apiVersion: batch/v1
    kind: Job
    metadata:
      name: primitive-pipeline-finite
    spec:
      parallelism: 1
      completions: null
      template:
        metadata:
          name: primitive-pipeline-finite
        spec:
          volumes: [{name: shared, emptyDir: {}}]
          initContainers:
          - name: pop-from-queue-unsafe
            image: redis
            command: ["sh","-c","redis-cli -h redis lpop job >/shared/item.txt; grep -q . /shared/item.txt || :>/shared/done.txt"]
            volumeMounts: [{name: shared, mountPath: /shared}]
          - name: step-1
            image: busybox
            command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo step-1 working on `cat /shared/item.txt` ...; sleep 5"]
            volumeMounts: [{name: shared, mountPath: /shared}]
          - name: step-2
            image: busybox
            command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo step-2 working on `cat /shared/item.txt` ...; sleep 5"]
            volumeMounts: [{name: shared, mountPath: /shared}]
          - name: step-3
            image: busybox
            command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo step-3 working on `cat /shared/item.txt` ...; sleep 5"]
            volumeMounts: [{name: shared, mountPath: /shared}]
          containers:
          - name: done
            image: busybox
            command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo all done with `cat /shared/item.txt`; sleep 1; exit 1"]
            volumeMounts: [{name: shared, mountPath: /shared}]
          restartPolicy: Never
    

    【讨论】:

    • 感谢您的回答。我在等待新项目的无限版本之后 - 这种将 restartPolicy 设置为 Never 然后使 pod 失败的方法对我来说是新的 - 但由此产生的错误状态并不是很好,因为它们会干扰监控并使其更难查看实际错误..
    • 是的,我不喜欢这种行为。您希望一分钟处理多少个项目?我问的原因是,因为如果它只说每分钟 10 项,您可以设置 completions: 1000000000(10 亿)这将足够大约 190 年(1000000000/10/60/24/365)。然后你就可以从'done'容器中成功退出了。
    • 实际上这些作业是手动生成的,所以它更像是每分钟最多 1 个。这基本上就是我最终使用的 - 只是一个普通的工作。我已将完成数设置为 10,000,但很好的是我可以走得更高——它是一个 int32。这将是足够的。
    • 这不支持基于消息队列指标的工作 pod 的自动水平扩展,对吗?
    • @connor.brinton,正确;这专门用于顺序处理(请参阅问题)
    【解决方案2】:

    在这种情况下,最简单的方法是使用CronJobCronJob 按照计划运行 Jobs。欲了解更多信息,请通过documentation

    这是一个例子(我从here拿来并修改它)

    apiVersion: batch/v1beta1
    kind: CronJob 
    metadata:
      name: sequential-jobs
    spec:
      schedule: "*/1 * * * *" #Here is the schedule in Linux-cron format
      jobTemplate:
        spec:
          template:
            metadata:
              name: sequential-job
            spec:
              initContainers:
              - name: job-1
                image: busybox
                command: ['sh', '-c', 'for i in 1 2 3; do echo "job-1 `date`" && sleep 5s; done;']
              - name: job-2
                image: busybox
                command: ['sh', '-c', 'for i in 1 2 3; do echo "job-2 `date`" && sleep 5s; done;']
              containers:
              - name: job-done
                image: busybox
                command: ['sh', '-c', 'echo "job-1 and job-2 completed"']
              restartPolicy: Never
    

    然而,他的解决方案有一些限制:

    • 运行频率不能超过 1 分钟
    • 如果您需要一个接一个地处理您的工作项,您需要在InitContainer 中为正在运行的作业创建额外的检查
    • CronJobs 仅在 Kubernetes 1.8 及更高版本中可用

    【讨论】:

    • 谢谢,这还不错,一旦可以将 .spec.concurrencyPolicy 设置为 Forbid。不过 1 分钟的限制并不理想..
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-13
    • 2023-03-23
    • 1970-01-01
    • 2017-08-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多