HPA for Celery workers

Itay Bittan
DynamicYield Tech Blog
4 min readJan 15, 2021

--

Celery is a simple and reliable distributed system to process tasks. At Dynamic Yield, we use Celery to orchestrate and distribute work across pods in Kubernetes.

Celery communicates via messages, usually using a broker to mediate between clients and workers. We use RabbitMQ as the broker.

In our system, we have 30 different Celery tasks such as:

  • Reading data from Elasticsearch
  • Indexing data to Elasticsearch
  • Reading data from S3
  • Writing data to Redis

There’s a huge variety between those tasks: some of them only run for seconds while others take hours, depending on the data (size) being processed and the operation type (read/write).

We have 10 unique workers, and each consumes one or more tasks (types).

This helps us to:

  • Run multiple tasks for each of our customers in parallel
  • Better limit and control the read/write rates against other resources, especially Databases

We have 3 client components that publish tasks (aka messages):

  • Celery beat — similar to cron job; a time-based tasks scheduler that allows us to publish periodic tasks on specific repeated times
  • Pub-sub service — we use Redis pub-sub paradigm for publishing messages between our internal components. In this case, Celery tasks are triggered per the request of other internal components
  • Web UI (REST API) — we manually publish tasks via our internal web server for on-demand work or in case something goes wrong

When a task is published, Celery adds a message to the RabbitMQ queue. In our case, each worker consumes from a dedicated queue for simplicity and easy auto-scale.

Horizontal Pod Autoscaler (HPA)

We’ve been using Celery for several years now and it has proven to be a reliable system. As we’ve grown, we scaled out our worker's replicas to handle the load and we found that those workers are not optimally utilized. For several hours they might be idle and do nothing while in others they’re 100% busy. To optimize resource utilization for better performance and lower cost, we decided to move our celery infrastructure to Kubernetes.

Kubernetes allows us to define HPA based on external metrics. In our case, we used Prometheus to collect RabbitMQ metrics. There’s a RabbitMQ Prometheus plugin that we installed based on the RabbitMQ official guide.

This helped in two ways. First, it gave us great observability with an overview dashboard:

Second, it exposed per queue insights like queue-depth (you can see the full list of the available metrics here).

We set the HPA external metric to:

rabbitmq_queue_messages

which is the sum of ready and unacknowledged messages — total queue depth. This way each task (message) gets a dedicated Celery worker:

apiVersion: autoscaling/v2beta1
kind: HorizontalPodAutoscaler
metadata:
name: celery-worker-elasticsearch-indexer
labels:
app: celery-worker
release: celery-worker-elasticsearch-indexer
annotations:
metric-config.external.prometheus-query.prometheus/celery_worker_elasticsearch_indexer: |
rabbitmq_queue_messages{release=”celery-broker”, queue=”elasticsearch-indexer-queue”}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: celery-worker-elasticsearch-indexer
minReplicas: 10
maxReplicas: 20
metrics:
- type: External
external:
metricName: prometheus-query
metricSelector:
matchLabels:
query-name: celery_worker_elasticsearch_indexer
targetAverageValue: 1

In the snippet above (taken from the Kubernetes deployment yaml) we configured HPA for celery_worker_elasticsearch_indexer worker with a minimum threshold of 10 and a maximum threshold of 20. You can see the correlation between the queue depth to the number of workers (replicas) in the graphs:

Note that we’re using the high threshold to limit the parallelism of a specific task and, in that case, it helps us to limit the number of indexing operations to Elasticseach.

For that reason, we also disabled our internal KubeHpaMaxedOut alerts for celery-workers-* since it’s totally fine if we use 100% of the workers/replicas (as opposed to serving components, the processing can be delayed for a while).

In addition, we tune the following configuration:

  • Set the concurrency of each worker to 1. We preferred lite workers with small resources in terms of CPU/memory. This is also more accurate as tasks can be distributed across all replicas equally.
  • Tasks might have to wait in case no worker can consume their task. The time it takes for a new worker (replica) to join depends on resource availability on your Kubernetes nodes, image size (if it doesn’t exist on the node), and the bring-up time. In general, it might take a few seconds. If you want to ensure that there are always available workers you can set the targetAverageValue<1 or set a reasonable minimum replicas size as we did.
  • Celery’s configuration, task_acks_late = True, so the task messages will be acknowledged after the task has been executed, and workers don’t stop in the middle due to scale-in.
  • Celery’s configuration, worker_prefetch_multiplier = 1, to ensure each worker prefetch a single message and not 4 (default).
  • Celery’s configuration, worker_max_tasks_per_child = 1 (for the same reason)
  • Celery’s configuration, task_track_started = True, to keep track of which task is running.

Summary

Enabling HPA for Celery workers improves our overall performance. It decreased the time it takes for tasks to be handled at peak hours by scaling out our resources.
As for cost, we ended up saving a significant amount of money as we can now scale down workers during off hours.

If you have questions, please feel free to ask them here.

--

--