使用工作队列进行粗粒度并行处理
本例中,你将会运行包含多个并行工作进程的 Kubernetes Job。
本例中,每个 Pod 一旦被创建,会立即从任务队列中取走一个工作单元并完成它,然后将工作单元从队列中删除后再退出。
下面是本次示例的主要步骤:
-
启动一个消息队列服务。 本例中,我们使用 RabbitMQ,你也可以用其他的消息队列服务。 在实际工作环境中,你可以创建一次消息队列服务然后在多个任务中重复使用。
-
创建一个队列,放上消息数据。 每个消息表示一个要执行的任务。本例中,每个消息是一个整数值。 我们将基于这个整数值执行很长的计算操作。
-
启动一个在队列中执行这些任务的 Job。 该 Job 启动多个 Pod。每个 Pod 从消息队列中取走一个任务,处理任务,然后退出。
准备开始
你应当熟悉 Job 的基本用法(非并行的),请参考 Job。
你必须拥有一个 Kubernetes 的集群,且必须配置 kubectl 命令行工具让其与你的集群通信。 建议运行本教程的集群至少有两个节点,且这两个节点不能作为控制平面主机。 如果你还没有集群,你可以通过 Minikube 构建一个你自己的集群,或者你可以使用下面的 Kubernetes 练习环境之一:
你需要一个容器镜像仓库,用来向其中上传镜像以在集群中运行。
此任务示例还假设你已在本地安装了 Docker。
启动消息队列服务
本例使用了 RabbitMQ,但你可以更改该示例,使用其他 AMQP 类型的消息服务。
在实际工作中,在集群中一次性部署某个消息队列服务,之后在很多 Job 中复用,包括需要长期运行的服务。
按下面的方法启动 RabbitMQ:
# 为 StatefulSet 创建一个 Service 来使用
kubectl create -f https://kubernetes.io/examples/application/job/rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created
kubectl create -f https://kubernetes.io/examples/application/job/rabbitmq/rabbitmq-statefulset.yaml
statefulset "rabbitmq" created
测试消息队列服务
现在,我们可以试着访问消息队列。我们将会创建一个临时的可交互的 Pod, 在它上面安装一些工具,然后用队列做实验。
首先创建一个临时的可交互的 Pod:
# 创建一个临时的可交互的 Pod
kubectl run -i --tty temp --image ubuntu:22.04
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
... [ previous line repeats several times .. hit return when it stops ] ...
请注意你的 Pod 名称和命令提示符将会不同。
接下来安装 amqp-tools
,这样你就能用消息队列了。
下面是在该 Pod 的交互式 shell 中需要运行的命令:
apt-get update && apt-get install -y curl ca-certificates amqp-tools python3 dnsutils
后续,你将制作一个包含这些包的容器镜像。
接着,你将要验证可以发现 RabbitMQ 服务:
# 在 Pod 内运行这些命令
# 请注意 rabbitmq-service 拥有一个由 Kubernetes 提供的 DNS 名称:
nslookup rabbitmq-service
Server: 10.0.0.10
Address: 10.0.0.10#53
Name: rabbitmq-service.default.svc.cluster.local
Address: 10.0.147.152
(IP 地址会有所不同)
如果 kube-dns 插件没有正确安装,上一步可能会出错。 你也可以在环境变量中找到该服务的 IP 地址。
# 在 Pod 内运行此检查
env | grep RABBITMQ_SERVICE | grep HOST
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152
(IP 地址会有所不同)
接下来,你将验证是否可以创建队列以及发布和使用消息。
# 在 Pod 内运行这些命令
# 下一行,rabbitmq-service 是访问 rabbitmq-service 的主机名。5672是 rabbitmq 的标准端口。
export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
# 如果上一步中你不能解析 "rabbitmq-service",可以用下面的命令替换:
BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672
# 现在创建队列:
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
foo
向队列推送一条消息:
/usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello
# 然后取回它:
/usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo
Hello
最后一个命令中,amqp-consume
工具从队列中取走了一个消息,并把该消息传递给了随机命令的标准输出。
在这种情况下,cat
会打印它从标准输入中读取的字符,echo 会添加回车符以便示例可读。
为队列增加任务
现在用一些模拟任务填充队列。在此示例中,任务是多个待打印的字符串。
实践中,消息的内容可以是:
- 待处理的文件名
- 程序额外的参数
- 数据库表的关键字范围
- 模拟任务的配置参数
- 待渲染的场景的帧序列号
如果有大量的数据需要被 Job 的所有 Pod 读取,典型的做法是把它们放在一个共享文件系统中, 如 NFS(Network File System 网络文件系统),并以只读的方式挂载到所有 Pod,或者 Pod 中的程序从类似 HDFS (Hadoop Distributed File System 分布式文件系统)的集群文件系统中读取。
例如,你将创建队列并使用 AMQP 命令行工具向队列中填充消息。实践中,你可以写个程序来利用 AMQP 客户端库来填充这些队列。
# 在你的计算机上运行此命令,而不是在 Pod 中
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1 -d
job1
将这几项添加到队列中:
for f in apple banana cherry date fig grape lemon melon
do
/usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
done
你给队列中填充了 8 个消息。
创建容器镜像
现在你可以创建一个做为 Job 来运行的镜像。
这个 Job 将用 amqp-consume
实用程序从队列中读取消息并进行实际工作。
这里给出一个非常简单的示例程序:
#!/usr/bin/env python
# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.readlines()[0])
time.sleep(10)
赋予脚本执行权限:
chmod +x worker.py
现在,编译镜像。创建一个临时目录,切换到这个目录。下载 Dockerfile 和 worker.py。 无论哪种情况,都可以用下面的命令编译镜像:
docker build -t job-wq-1 .
对于 Docker Hub, 给你的应用镜像打上标签,
标签为你的用户名,然后用下面的命令推送到 Hub。用你的 Hub 用户名替换 <username>
。
docker tag job-wq-1 <username>/job-wq-1
docker push <username>/job-wq-1
如果你使用替代的镜像仓库,请标记该镜像并将其推送到那里。
定义 Job
这里给出一个 Job 的清单。你需要复制一份 Job 清单的副本(将其命名为 ./job.yaml
),
并编辑容器镜像的名称以匹配使用的名称。
apiVersion: batch/v1
kind: Job
metadata:
name: job-wq-1
spec:
completions: 8
parallelism: 2
template:
metadata:
name: job-wq-1
spec:
containers:
- name: c
image: gcr.io/<project>/job-wq-1
env:
- name: BROKER_URL
value: amqp://guest:guest@rabbitmq-service:5672
- name: QUEUE
value: job1
restartPolicy: OnFailure
本例中,每个 Pod 使用队列中的一个消息然后退出。
这样,Job 的完成计数就代表了完成的工作项的数量。
这就是示例清单将 .spec.completions
设置为 8
的原因。
运行 Job
运行 Job:
这假设你已经下载并编辑了清单
kubectl apply -f ./job.yaml
你可以等待 Job 在某个超时时间后成功:
# 状况名称的检查不区分大小写
kubectl wait --for=condition=complete --timeout=300s job/job-wq-1
接下来查看 Job:
kubectl describe jobs/job-wq-1
Name: job-wq-1
Namespace: default
Selector: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
Labels: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
job-name=job-wq-1
Annotations: <none>
Parallelism: 2
Completions: 8
Start Time: Wed, 06 Sep 2022 16:42:02 +0000
Pods Statuses: 0 Running / 8 Succeeded / 0 Failed
Pod Template:
Labels: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
job-name=job-wq-1
Containers:
c:
Image: container-registry.example/causal-jigsaw-637/job-wq-1
Port:
Environment:
BROKER_URL: amqp://guest:guest@rabbitmq-service:5672
QUEUE: job1
Mounts: <none>
Volumes: <none>
Events:
FirstSeen LastSeen Count From SubobjectPath Type Reason Message
───────── ──────── ───── ──── ───────────── ────── ────── ───────
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-hcobb
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-weytj
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-qaam5
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-b67sr
26s 26s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-xe5hj
15s 15s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-w2zqe
14s 14s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-d6ppa
14s 14s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-p17e0
该 Job 的所有 Pod 都已成功!你完成了。
替代方案
本文所讲述的处理方法的好处是你不需要修改你的 "worker" 程序使其知道工作队列的存在。 你可以将未修改的工作程序包含在容器镜像中。
使用此方法需要你运行消息队列服务。如果不方便运行消息队列服务, 你也许会考虑另外一种任务模式。
本文所述的方法为每个工作项创建了一个 Pod。 如果你的工作项仅需数秒钟,为每个工作项创建 Pod 会增加很多的常规消耗。 考虑另一种设计,例如精细并行工作队列示例, 这种方案可以实现每个 Pod 执行多个工作项。
示例中,你使用了 amqp-consume
从消息队列读取消息并执行真正的程序。
这样的好处是你不需要修改你的程序使其知道队列的存在。
要了解怎样使用客户端库和工作队列通信,
请参考精细并行工作队列示例。
友情提醒
如果设置的完成数量小于队列中的消息数量,会导致一部分消息项不会被执行。
如果设置的完成数量大于队列中的消息数量,当队列中所有的消息都处理完成后, Job 也会显示为未完成。Job 将创建 Pod 并阻塞等待消息输入。 你需要建立自己的机制来发现何时有工作要做,并测量队列的大小,设置匹配的完成数量。
当发生下面两种情况时,即使队列中所有的消息都处理完了,Job 也不会显示为完成状态:
- 在
amqp-consume
命令拿到消息和容器成功退出之间的时间段内,执行杀死容器操作; - 在 kubelet 向 API 服务器传回 Pod 成功运行之前,发生节点崩溃。
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.