Scaling Celery workers with RabbitMQ on Kubernetes
March 2021
TLDR: In this article, you will explore how to use Kubernetes and KEDA to scale Celery workers based on the number of messages in a RabbitMQ queue.
Using a job queuing system to offload time-consuming tasks is a typical pattern in web applications.
When a user requests a time-consuming task such as transcoding videos, the web application can:
- Make a note of the request.
- Add it to a queue.
- Return a response to the user immediately.
Another app (usually referred to as consumer or worker process) consumes those messages from the queue and executes the tasks asynchronously.
This pattern frees up the main application from blocking execution and serving multiple concurrent users.
When working with Python, Celery is a popular option for a job queuing system.
Celery can be paired with a message broker such as RabbitMQ to connect the app that adds the tasks (producer) and the worker processing the jobs (consumers).
- 1/4
When the application receives requests, it creates a description of the job that has to be completed.
- 2/4
The user immediately receives a response that the job has been accepted (but not completed).
- 3/4
The tasks are dispatched to the message broker.
- 4/4
Each worker can retrieve a message from the queue and start executing the task.
As you can imagine, you can process tasks quicker if you have several worker processes running simultaneously.
However, workers might consume resources, even when idle.
Is there a mechanism to add workers when the queue is full and reduce them when the queue is empty?
In Kubernetes, you can scale your app's instances with the Horizontal Pod Autoscaler (HPA).
The Horizontal Pod Autoscaler (HPA) can be configured to increase and decrease the number of replicas based on metrics such as CPU and memory.
What if you could use the number of messages in the queue to trigger the autoscaling instead?
You could increment the Pods and consume messages quicker when the queue is full.
If the queue is empty, you can scale your workers down to zero and save on resources.
But isn't Kubernetes only scaling Pods on metrics such as CPU and memory?
How does it know about the length of the queue?
Kubernetes does not understand custom metrics out of the box.
However, you can use an event-driven autoscaler such as KEDA to collect and expose metrics to Kubernetes from databases (MySQL, Postgres), message queues (RabbitMQ, AWS SQS), telemetry systems (AWS Cloudwatch, Azure Monitor), etc.
The data can be used in combination with the Horizontal Pod Autoscaler to create more Pods when the queue is full.
Let's see this in action.
The next steps are executed locally in minikube. If you're on Windows, you can follow our handy guide on installing Minikube on Windows.
Generating long-running reports
You can find all the code at github.com/yolossn/flask-celery-microservice.
Consider the following web application with two endpoints:
- A
/report
endpoint that triggers creating a report. The app takes 60 seconds to complete a report. - A
/report/<id>
endpoint to fetch the status of the report. The status can either be in progress or completed.
Every time a user requests a report, the task is dispatched to Celery.
Celery orchestrates and distributes the task using two components:
- RabbitMQ acts as a message broker. This is used to distribute the messages to the workers.
- Postgresql to store the state of the tasks. This is necessary to keep track of what jobs are completed or in progress.
So the full journey for a single request is:
- The user requests a report to the Flask app.
- The Flask app submits the task to the queue. The job is recorded in the database.
- The app also replies to the user with the id of the task.
- A worker picks up the job and runs it to completion.
- The job is marked as completed in the database.
- The user retrieves the report. The report is ready.
Let's deploy the app into a Kubernetes cluster and test how it works.
bash
kubectl apply -f postgres.yaml
namespace/postgres created
secret/postgres-secret created
deployment.apps/postgresql created
service/postgresql created
kubectl apply -f rabbitmq.yaml
namespace/rabbitmq created
secret/rabbitmq-secret created
deployment.apps/rabbitmq created
service/rabbitmq created
kubectl apply -f flask_server.yaml
namespace/flask-backend created
secret/flask-secret created
deployment.apps/flask-server created
service/flask-server created
kubectl apply -f celery_workers.yaml
namespace/celery-workers created
secret/celery-workers-secret created
deployment.apps/celery-worker created
Both the application and worker deployments share the same secrets, connecting to the same queue and database.
Let's try to make a request to the app by setting up a port forward:
bash
kubectl port-forward --namespace flask-backend service/flask-server 8000:80
Forwarding from 127.0.0.1:8000 -> 5000
Forwarding from [::1]:8000 -> 5000
Open a new terminal session, and initiate a report with:
bash
curl -X POST http://localhost:8000/report
{"report_id":"5ed51ec5-f0c8-43cb-bec5-3074e429d29b"}
You can use the report_id
value in the response to check the status of the job:
bash
curl http://localhost:8000/report/5ed51ec5-f0c8-43cb-bec5-3074e429d29b
{"id":"5ed51ec5-f0c8-43cb-bec5-3074e429d29b","result":{"state":"completed"}}
If the job is still in progress, you might see the the following response:
{"id":"5ed51ec5-f0c8-43cb-bec5-3074e429d29b","result":null}
As you can imagine, the time that it takes to process a report depends on:
- The number of messages in the queue and
- The number of workers.
If the queue is almost empty, it should take 60 seconds for the reports to be ready.
If there are several messages in the queue, you might have to wait anywhere between minutes to hours for the report.
Let's test that further.
Load testing the app with Locust
There are many tools available to load testing apps such as ab, k6, BlazeMeter etc.
In this tutorial, you will use Locust — an open-source load testing tool.
Locust is convenient for two reasons:
- It has a dashboard where you can inspect the traffic generated.
- You can generate load using a script.
The following script is meant to create (and check) reports on the app:
complete_flow_load_test.py
class reportService(HttpUser):
wait_time = constant(1)
def _do_async_handler(self, timeout=600):
# Request for a report
post_resp = requests.post(self.host + 'report')
if not post_resp.status_code == 200:
return
id = post_resp.json()['report_id']
# Poll for completed report
start_time = time.monotonic()
end_time = start_time + timeout
while time.monotonic() < end_time:
r = requests.get(self.host + 'report/' + id)
if r.status_code == 200 and r.json()['result'] != None:
async_success('POST /report/ID - async', start_time, post_resp)
return
time.sleep(1)
async_failure('POST /report/ID - async', start_time, post_resp,
'Failed - timed out after %s seconds' % timeout)
@task
def async_test(self):
gevent.spawn(self._do_async_handler)
Please notice that the script was truncated for readability, but you can find the full script in the GitHub repository.
If you save the file locally, you can start Locust as a container with:
bash
docker run -p 8089:8089 \
-v $PWD:/mnt/locust \
locustio/locust -f /mnt/locust/complete_flow_load_test.py
When it starts, the container binds on port 8089 on your computer.
You can open your browser on http://localhost:8089 to access the web interface.
You should simulate one user submitting a report every second.
As the URL of the app, you should use the same URL that you used to issue curl
requests.
Start the test; what do you observe?
The response time for a report is increasing, but all responses are successful.
Of course, with only a few workers, your request may end up sitting in the queue for a long time before it's picked up.
Let's work out the math:
- Every second, a user creates a report.
- A report takes 60 seconds to complete.
- There are only two workers.
During a minute, 60 reports are requested, and only two are completed.
The queue fills up rather quickly.
What about the website, is it responsive?
You can run a quick test with the following Locust script:
generate_flow_load_test.py
from locust import HttpUser, task, constant
class reportService(HttpUser):
wait_time = constant(1)
@task
def generate_report(self):
resp = self.client.post("/report")
if resp.status_code != 200:
print("Error generating report")
Start Locust as a container with:
bash
docker run -p 8089:8089 \
-v $PWD:/mnt/locust \
locustio/locust -f /mnt/locust/generate_flow_load_test.py
If you simulate 200 users with a hatch rate of 10, you will notice that the response is linear.
No matter how many users you throw at the website, all requests are immediately returned.
That's great, but how could you fix the reports taking ages to be returned?
You could increase the number of workers manually with:
bash
kubectl scale --replicas=4 deployment/celery-worker --namespace celery-workers
deployment.apps/celery-worker scaled
Now you're twice as fast as before.
However, when the queue is empty, you should probably scale the workers down to two (or zero).
bash
kubectl scale --replicas=2 deployment/celery-worker --namespace celery-workers
deployment.apps/celery-worker scaled
If you don't like to manually scale the number of replicas, you might be tempted to automate the process with an autoscaler.
In the next part, you will explore KEDA — the Kubernetes event-driven autoscaler.
The Kubernetes Metrics API
The Horizontal Pod Autoscaler (HPA) in Kubernetes does not work out of the box.
It has to make decisions on when to add or remove replicas based on real data.
Unfortunately, Kubernetes does not collect and aggregate metrics.
Instead, Kubernetes defines a Metrics API and leaves it to other software for the actual implementation.
The API is divided into three categories:
- Resource Metrics.
- Custom Metrics.
- External Metrics.
These APIs are intended to serve different types of metrics:
- The Resource Metrics API is designed for predefined resource usage metrics of Pods and Nodes, such as CPU and memory usage.
- The Custom Metrics API is intended for custom application-level metrics associated with a Kubernetes object.
- The External Metrics API is designed for custom application-level metrics that are not associated with a Kubernetes object.
- 1/4
Metrics exposed by the Metrics API are consumed by the Horizontal Pod Autoscaler (HPA), Vertical Pod Autoscaler (VPA),
kubectl top
and more. There are three categories of metrics. - 2/4
The Resource Metrics API is intended for predefined resource usage metrics of Pods and Nodes, such as CPU and memory usage.
- 3/4
The Custom Metrics API is intended for custom application-level metrics that are associated with a Kubernetes object.
- 4/4
The External Metrics API is intended for custom application-level metrics that are not associated with a Kubernetes object.
You should install a component that collects and expose Resource Metrics if you want to use them.
Similarly, you need to install a component that exposes custom metrics to scale on those.
Instead, External Metrics are collected by an external system.
The official Metrics Server is an excellent choice to collect Resource Metrics.
However, it doesn't collect custom metrics from RabbitMQ or Celery — so you won't be able to use them in your Horizontal Pod Autoscaler (HPA).
How do you work around that?
KEDA is a Kubernetes event-driven autoscaler that acts as a Custom Metrics Server and integrates nicely with databases (MySQL, Postgres), message queues (RabbitMQ, AWS SQS), telemetry systems (AWS Cloudwatch, Azure Monitor), etc.
You could use KEDA to collect the queue length from RabbitMQ, integrate it with the Custom Metrics API and use it in your Horizontal Pod Autoscaler.
Let's have a look at how you could do that.
You can install KEDA in your cluster with:
bash
kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.0.0/keda-2.0.0.yaml
namespace/keda created
customresourcedefinition.apiextensions.k8s.io/scaledjobs.keda.sh created
customresourcedefinition.apiextensions.k8s.io/scaledobjects.keda.sh created
customresourcedefinition.apiextensions.k8s.io/triggerauthentications.keda.sh created
serviceaccount/keda-operator created
clusterrole.rbac.authorization.k8s.io/keda-external-metrics-reader created
clusterrole.rbac.authorization.k8s.io/keda-operator created
rolebinding.rbac.authorization.k8s.io/keda-auth-reader created
clusterrolebinding.rbac.authorization.k8s.io/keda-hpa-controller-external-metrics created
clusterrolebinding.rbac.authorization.k8s.io/keda-operator created
clusterrolebinding.rbac.authorization.k8s.io/keda:system:auth-delegator created
service/keda-metrics-apiserver created
deployment.apps/keda-metrics-apiserver created
deployment.apps/keda-operator created
apiservice.apiregistration.k8s.io/v1beta1.external.metrics.k8s.io created
The command creates a new namespace and installs all the required resources and Custom Resource Definitions (CRDs).
The resources will make more sense as soon as you use them.
Now, let's configure KEDA to recognise the worker and scale the Pods based on the number of messages in the queue.
Scaling on queue size with KEDA
KEDA can collect metrics from RabbitMQ because it has a specific adapter designed to integrate with it called trigger
.
A trigger
is part of a ScaledObject — a definition of how a Deployment should be autoscaled.
Let's have a look at an example.
If you wish to scale your Pods based on the length of the queue, you could use the following ScaledObject definition:
keda.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: celery-worker-scaler
namespace: celery-workers
spec:
scaleTargetRef:
name: celery-worker
pollingInterval: 3
minReplicaCount: 2
maxReplicaCount: 30
triggers:
- type: rabbitmq
metadata:
queueName: celery
queueLength: '10'
authenticationRef:
name: rabbitmq-worker-trigger
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: rabbitmq-worker-trigger
namespace: celery-workers
spec:
secretTargetRef:
- parameter: host
name: celery-workers-secret
key: CELERY_BROKER_URL
The above YAML contains a ScaledObject as well as a TriggerAuthentication. The latter is used to authenticate to RabbitMQ using the credential stored in a Secret.
You can submit the definition to the cluster with:
bash
kubectl apply -f keda.yaml
scaledobject.keda.sh/celery-worker-scaler created
triggerauthentication.keda.sh/rabbitmq-worker-trigger created
Please note that the ScaledObject resource must be in the same namespace as the target deployment.
You can list the ScaledObjects resource by the following command.
bash
kubectl get scaledobjects --namespace celery-workers
NAME SCALETARGETKIND SCALETARGETNAME TRIGGERS READY ACTIVE
celery-worker-scaler apps/v1.Deployment celery-worker rabbitmq True False
Notice how the ScaledObject created a Horizontal Pod Autoscaler for you:
bash
kubectl get hpa --namespace celery-workers
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS
keda-hpa-celery-worker-scaler Deployment/celery-worker 0/10 (avg) 2 30 4
Let's review the ScaledObject:
keda.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: celery-worker-scaler
namespace: celery-workers
spec:
scaleTargetRef:
name: celery-worker
pollingInterval: 3
minReplicaCount: 2
maxReplicaCount: 30
triggers:
- type: rabbitmq
metadata:
queueName: celery
queueLength: '10'
authenticationRef:
name: rabbitmq-worker-trigger
The ScaledObject is a Custom Resource Definition (CRD) that creates a Horizontal Pods Autoscaler (HPA).
Since the Horizontal Pod Autoscaler (HPA) needs to know the min and max replicas as well as the reference to the deployment, those values are defined in the ScaledObject too (i.e. spec.minReplicaCount
, spec.maxReplicaCount
and spec.scaleTargetRef.name
).
As you can guess, the autoscaler should create one more worker for every 10 messages in the queue up to 30.
The metrics are defined in the spec.triggers
section.
Now, let's repeat the experiment with Locust and analyse the result.
Profiling autoscaling with Locust
You can expose the app with:
bash
kubectl port-forward --namespace flask-backend service/flask-server 8000:80
Forwarding from 127.0.0.1:8000 -> 5000
Forwarding from [::1]:8000 -> 5000
You should execute the same load test as before:
complete_flow_load_test.py
class reportService(HttpUser):
wait_time = constant(1)
def _do_async_handler(self, timeout=600):
# Request for a report
post_resp = requests.post(self.host + 'report')
if not post_resp.status_code == 200:
return
id = post_resp.json()['report_id']
# Poll for completed report
start_time = time.monotonic()
end_time = start_time + timeout
while time.monotonic() < end_time:
r = requests.get(self.host + 'report/' + id)
if r.status_code == 200 and r.json()['result'] != None:
async_success('POST /report/ID - async', start_time, post_resp)
return
time.sleep(1)
async_failure('POST /report/ID - async', start_time, post_resp,
'Failed - timed out after %s seconds' % timeout)
@task
def async_test(self):
gevent.spawn(self._do_async_handler)
Please notice that the script was truncated for readability, but you can find the full script in the GitHub repository.
And you can finally start Locust as a container with:
bash
docker run -p 8089:8089 \
-v $PWD:/mnt/locust \
locustio/locust -f /mnt/locust/complete_flow_load_test.py
Visit your browser on http://localhost:8089 to access the web interface.
You should simulate one user submitting a report every second.
Click on start and switch over to the graph section.
The real-time graph shows the requests per second received by the app, as well as failure rate, response codes, etc.
What do you observe?
Compared to the last test, the average waiting time for a report has dropped.
It makes sense since the autoscaler creates more workers to process the tasks.
Let's work out the math again.
- Every second, a user creates a report.
- A report takes 60 seconds to complete.
- The autoscaler (eventually) increases the replicas to 30.
During a minute, 60 reports are requested, and 30 are completed.
The queue still keeps getting bigger but at a slower pace.
Let's inspect the Horizontal Pod Autoscaler (HPA).
bash
kubectl get hpa --namespace celery-workers --watch
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS
keda-hpa Deployment/celery-worker 0/10 (avg) 2 30 2
keda-hpa Deployment/celery-worker 11/10 (avg) 2 30 2
keda-hpa Deployment/celery-worker 98/10 (avg) 2 30 3
keda-hpa Deployment/celery-worker 124500m/10 (avg) 2 30 6
keda-hpa Deployment/celery-worker 121667m/10 (avg) 2 30 12
keda-hpa Deployment/celery-worker 94250m/10 (avg) 2 30 24
keda-hpa Deployment/celery-worker 112467m/10 (avg) 2 30 30
Please notice that the
--watch
flag will keep streaming the updates of the Horizontal Pod Autoscaler until you stop it.
What about when the queue is empty?
The Horizontal Pod Autoscaler (HPA) reduces the replicas to the minimum (2).
It's important to pay attention to the timings, though.
The Horizontal Pod Autoscaler (HPA) considers a fixed interval before analysing and triggering the downscaling.
By default, it will consider the last 5 minutes window and select the best replica count that matches the data (but you can tweak that with the --horizontal-pod-autoscaler-downscale-stabilization
flag).
How is it all working?
Architecting the app
Now that you have seen how KEDA can help you scale your worker processes based on queue size let's dive deep into how the application was architected.
app/app.py
from flask import Flask
from celery import Celery
import os
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = os.getenv("CELERY_BROKER_URL")
app.config['RESULT_BACKEND'] = os.getenv("CELERY_RESULT_BACKEND")
app.config['SECRET_KEY'] = os.getenv("SECRET_KEY")
celery = Celery(app.import_name,
backend=app.config['RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
This is probably the less exciting part.
In this snippet, you are configuring RabbitMQ as a messaging broker and Postgres as a backend.
Celery needs RabbitMQ to distribute the messages and Postgres to store the status of the jobs.
How do you define a job?
Celery makes it straightforward to convert Python functions into tasks that can run as workers.
You can decorate your function with @celery.task
to make them workers:
app/tasks.py
from app.app import celery
import time
import random
@celery.task(name="report")
def report():
print("Generating report")
time.sleep(60)
return {"state":"completed"}
Since this is a dummy report, the function just sleeps for 60 seconds.
But in a real-world scenario, this is where you would code your long-running tasks.
Once the function completes the task, Celery stores the result in Postgres — the backend.
The last piece of the puzzle is the API endpoint that triggers the task.
app/routes.py
from app.app import app
from flask import request, render_template, jsonify
from celery.result import AsyncResult
from app.tasks import *
@app.route('/')
def default():
return "Welcome to Report Service"
@app.route('/health')
def health():
return jsonify({"state":"healthy"})
@app.route('/report', methods=['POST'])
def generate_report():
async_result = report.delay()
return jsonify({"report_id":async_result.id})
@app.route('/report/<report_id>')
def get_report(report_id):
res = AsyncResult(report_id,app=celery)
return jsonify({"id":res.id,"result":res.result})
As you can see, Celery is doing a lot of heavy lifting.
You can schedule a task with <functionname>.delay()
, and Celery serialises the arguments, stores them in the backend, and dispatches the message to RabbitMQ.
Since you might need to retrieve the job later, the function returns the id of the task.
You can use the same task id to retrieve the results too.
Collecting metrics with Prometheus
It's common practice to set up the following stack to ingest and autoscale on custom metrics such as the length of the queue:
- RabbitMQ Prometheus metric exporter to expose the metrics from RabbitMQ.
- Prometheus to scrape aggregate and query metrics.
- Prometheus adapter to expose the metrics collected by Prometheus according to the metrics API.
At this point, you can consume the metrics with the Horizontal Pod Autoscaler (HPA).
- 1/6
When you create an Horizontal Pod Autoscaler (HPA) for your deployments, nothing happens unless you are collecting metrics.
- 2/6
The Metrics API is just a specification.You need to install a component that collects and aggregated metrics.
- 3/6
A popular option to collect metrics is Prometheus.
- 4/6
However, Prometheus doesn't know how to scrape metrics from RabbitMQ. Fortunately, there is an adapter for that.
- 5/6
Prometheus doesn't know how to expose metrics according to the Kubernetes metrics API either. Fortunately, there is an adapter for that too.
- 6/6
Once the metrics are collected, aggregated and exposed, the Horizontal Pod Autoscaler (HPA) controller can use those to increase or descrease the replicas.
Can you spot any issue with this setup?
- The Horizontal Pod Autoscaler is a Kubernetes resource managed by a controller that periodically checks the metrics, calculates the required replica count, and updates the replicas of your deployment.
- Prometheus scrapes metrics at a regular interval from its sources.
- The Prometheus adapter for the Metrics API discovers metrics from Prometheus on a regular interval.
Metrics have to pass three components before they reach the Horizontal Pod Autoscaler (HPA) controller.
Also, by default, the Horizontal Pod Autoscaler check the metrics every 15 seconds (this is controlled by the --horizontal-pod-autoscaler-sync-period
flag).
By default, Prometheus scrapes metrics every 30 seconds.
The Prometheus Adapter exposes metrics discovered in the last 1 minute (if you add a new metric, it will take about 1 minute to be found).
So it could take up to 45 seconds before the Horizontal Pod Autoscaler triggers the autoscaling.
What about KEDA?
Collecting metrics with KEDA
KEDA comprises of three components:
- Scaler
- Metrics Adapter
- Controller
Scalers are like adapters that can collect metrics from databases, message brokers, telemetry systems etc.
For example, the RabbitMQ Scaler is an adapter that knows how to collect metrics specific to RabbitMQ.
This is similar to the RabbitMQ Prometheus metric exporter.
The Metrics Adapter is responsible for exposing the metrics collected by the scalers in a format that the Kubernetes metrics pipeline can consume.
This is similar to the Prometheus adapter.
Finally, the controller glues all the components together:
- It collects the metrics using the adapter and exposes them to the metrics API.
- It registers and manages the KEDA-specific Custom Resource Definitions (CRDs) — i.e. ScaledObject, TriggerAuthentication, etc.
- It creates and manages the Horizontal Pod Autoscaler on your behalf.
- 1/4
The Metrics API is just a specification. Unless you install a component that collects the metrics, you can't trigger the autoscaling.
- 2/4
KEDA has three components. The first component is the metrics server that integrates with the Kubernetes Metrics API.
- 3/4
The second component is the controller. The controller manages the Custom Resource Definitions (CRDs) and creates the Horizontal Pod Autoscaler (HPA) for the deployment.
- 4/4
The third component in KEDA is the Scaler. Scalers are like plugins that can extrapolate metrics from various tools such as RabbitMQ.
The controller watches for ScaledObjects resources, which are similar to Horizontal Pod Autoscaler (HPA) definitions but more detailed.
KEDA Custom Resource Definitions (CRDs)
ScaledObjects define how the Horizontal Pod Autoscalers (HPAs) are created (e.g. min and max replicas, frequency, etc.) and how to fetch the data.
Let's look at the YAML used to configure KEDA to scale the Celery workers.
keda.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: celery-worker-scaler
namespace: celery-workers
spec:
scaleTargetRef:
name: celery-worker
pollingInterval: 3
minReplicaCount: 2
maxReplicaCount: 30
triggers:
- type: rabbitmq
metadata:
queueName: celery
queueLength: '10'
authenticationRef:
name: rabbitmq-worker-trigger
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: rabbitmq-worker-trigger
namespace: celery-workers
spec:
secretTargetRef:
- parameter: host
name: celery-workers-secret
key: CELERY_BROKER_URL
ScaledObject resources define the mapping between the event-source and Kubernetes Deployment or StatefulSet.
It takes the following configurations and creates the HPA for the celery-worker deployment.
pollingInterval
, the interval to checkqueueLength
to scale the deployment.minReplicaCount
, the minimum number of replicas for the deployment.maxReplicaCount
, the maximum number of replicas for the deployment.triggers.metadata.queueName
, the name of the queue to check the length.triggers.metadata.queueLength
, the target length for HPA to kick in.
The TriggerAuthentication resource defines the authentication parameters needed to connect to RabbitMQ.
It's similar to Kubernetes Secrets, but it more powerful.
TriggerAuthentication resources can use Kubernetes secrets or leverage existing authentication mechanisms such as IAM (AWS), Azure identities or Cloud Identities (GCP).
In this case, the TriggerAuthentication reuses the celery-secret
Secret, which has the RabbitMQ URL used by the Celery workers.
Things to keep in mind while auto-scaling
You have seen how easily you can scale your Celery worker using the queue length.
However, you will likely see a particular problem associated with auto-scaling Pods.
Since the Horizontal Pod Autoscaler (HPA) scales the replicas up and down based on the metrics, there is a chance that the HPA can kill a pod when it is processing a task.
Imagine having a long-running job that takes two hours to complete, such as training a machine learning model.
You might not want to scale down your Pod if it has been running for an hour and a half.
When that happens, the job is lost and won't be retried.
One way to solve this is to change the Celery settings and ensure that the job is acknowledged and mark as "done" when completed.
Celery workers, by default, acknowledge a message when it's received.
In this way, even if the worker Pod is killed while processing a task, the message is still available in the queue and could be picked up later by other workers.
You can amend the following line in your code to make it happen:
app/tasks.py
from app.app import celery
import time
import random
@celery.task(name="report", acks_late=True)
def report():
print("Generating report")
time.sleep(60)
return {"state":"completed"}
What if your task is really long and you don't want to stop it at all?
Unfortunately, the autoscaler doesn't know what your app is doing.
It doesn't know that stopping a worker at 99% of its work is not a good idea.
And it doesn't know that the progress is at 99% completion either.
All it knows is how to increase and decrease the number of replicas.
If you think about it, it makes sense: Deployments (or better, ReplicaSets) create and destroy the Pods.
The autoscaler is only increasing and decreasing the replica count.
So is there a way to wait for the task to complete before deleting the Pod?
The short answer is maybe — you have to compromise, though.
Kubernetes provides Pod LifeCycle hooks that can be leveraged to delay a Pod being terminated.
The preStop
hook is one of them.
pod.yaml
apiVersion: v1
kind: Pod
metadata:
name: my-pod
spec:
containers:
- name: web
image: nginx
ports:
- name: web
containerPort: 80
lifecycle:
preStop:
exec:
command: ['sleep', '3600']
In the example above, the Pod will wait one hour from when the deletion is triggered.
This is because the preStop
hook is executed after the Pod is deleted.
What if you don't want to wait an hour?
You should not use a Deployment (or ReplicaSet, or StatefulSet, etc.) but create individual Pods with an operator or use Kubernetes jobs.
KEDA supports scaling Kubernetes jobs, but please notice that it doesn't do any orchestration like Celery.
You still need a component that creates and schedules the jobs.
You can find an example of executing Kubernetes jobs at scale, also in Aramada and Volcano.
If you are interested in creating your pods and scheduling tasks, you might want to explore the Kubernetes operators.
This article is an excellent introduction to operators.
How Kubernetes based scaling differs from built-in Celery scaling?
Celery, by default, supports scaling based on threads.
Threads are lighter than creating a new process such as Pod, but they have drawbacks too:
- Threads compete for resources, but you can't define limits.
- If a thread crashes, it takes down the entire process (and all other threads).
- Once you max out the resources on a server, you have to manually provision and add a new one.
With Kubernetes, you can have the worker as Pods.
Scaling Pods is a bit slower than threads, but you can benefit from:
- Isolation. If a worker crashes, it only affects that worker.
- Autoscaling. Not only you can add Pods as you need, but you can also leverage the cluster autoscaler to increase the nodes of your cluster.
- 1/3
Imagine having a cluster with two nodes. You might have existing Pods as well as your Celery workers.
- 2/3
As you scale your Celery workers, you might run our of resources.
- 3/3
You can combine the Horizontal Pod Autoscaler (HPA) with the cluster autoscaler (CA) to increase the nodes in your cluster when you run low on resources.
You can find more information about the cluster autoscaler on the official documentation and the cloud provider implementation on Github.
Summary
Designing applications at scale requires careful planning and testing.
Queue based architecture is an excellent design pattern to decouple your microservices and ensure they can be scaled and deployed independently.
And while you can roll out your deployment scripts, it's easier to leverage a container orchestrator such as Kubernetes and couple it with KEDA to deploy and scale your applications automatically.
That's all, folks
How do you autoscale workers in your cluster?
Do you run a different metrics stack?
Let us know in an email or tweet us @learnk8s.
A special thank you goes to Amit Saha, who offered some invaluable feedback and helped me review the article.