Zach
Posted on August 10, 2023
Celery is an asynchronous task queue system based on distributed message passing. It allows developers to execute background jobs concurrently, separate from the primary application flow, thus ensuring tasks like sending emails, generating reports, or processing data do not block or delay a user's request.
By offloading these heavyweight tasks to Celery, web frameworks like Django can quickly respond to the user and allow the task to run in the background, ensuring that users are not kept waiting.
While employing a single queue might suffice for smaller applications, as your project grows and tasks become more varied, you may find the need to prioritize certain tasks or run them on specialized workers. In this article we'll dive into how to use multiple queues with Django, both during local development and when deploying to production.
- 1. Integrating Celery with Django
- 2. Local Development with Django and Celery
- 3. Production Deployment with Django and Celery
- 4. Multiple Celery Queues Example App
Integrating Celery with Django
We first need to set up our Django project to work with Celery before adding multiple queues.
The first step to adding Celery to Django is to make sure the necessary dependencies are installed. We could install them on the command line, but instead we'll use a requirements.txt file to make the process repeatable.
Django==4.1.7
celery==5.2.7
redis==4.5.5
gunicorn==20.1.0
Now create a file named celery.py in the same directory containing your settings.py file. This file will act as the entry point for each Celery worker. Config settings are loaded here and the worker is told where to find task definitions. The celery.py file can also optionally contain scheduled tasks configuration.
import os
from datetime import timedelta
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
app = Celery("myapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
app.conf.beat_schedule = {
"run-every-10-seconds": {
"task": "myapp.tasks.foo",
"schedule": timedelta(seconds=10),
},
"run-every-1800-seconds": {
"task": "myapp.tasks.bar",
"schedule": timedelta(seconds=1800),
},
}
Finally, we need to make a small update to settings.py to include the CELERY_BROKER_URL variable. The broker URL will contain a connection string that points to a Redis instance. This Redis instance will be responsible for managing the queues that contain the pending task data.
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL")
Using an environment variable will allow the local and production deployments to work without code changes.
With that final tweak our Django project should be almost ready to work with Celery. Next we'll see how to containerize this setup and run it locally using Docker.
Local Development with Django and Celery
We can use Docker and Compose together to make local development with Django and Celery easier. This method also makes it easier to collaborate on the project with other developers.
The first step is to containerize the Django and Celery worker processes. To do this, we'll write a Dockerfile that contains the instructions to create an image with the necessary dependencies.
FROM python:3.9-slim-buster
ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE 1
RUN apt-get update && apt-get install -y \
libpq-dev \
&& apt-get -y clean
WORKDIR /usr/src/app
COPY ./requirements.txt /usr/src/app
RUN pip install --upgrade pip && pip install -r requirements.txt
COPY . /usr/src/app/
If you're familiar with the Dockerfile format, you may notice there is no CMD argument at the end. This is intentional, as we'll use docker-compose to specify the command later. This will allow one Dockerfile to work for both the Django and Celery containers.
Using Docker Compose to Manage the Containers
This is a simple docker-compose.yml file that will configure a Django dev server, Celery worker, and Redis instance. We use the dev server here because we'd like to take advantage of the hot reload feature. The production deployment will use Gunicorn to serve the application.
version: '3.8'
services:
web:
build:
context: .
dockerfile: Dockerfile
container_name: django_dev
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ${PWD}:/usr/src/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
ports:
- "8000:8000"
depends_on:
- redis
celery:
build:
context: .
dockerfile: Dockerfile
container_name: celery_worker
command: celery -A myapp worker --loglevel=info
volumes:
- ${PWD}:/usr/src/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on:
- redis
redis:
image: redis:latest
container_name: redis
ports:
- "6379:6379"
Starting and running this collection of containers requires just one command, which will also build images and download any remote images, such as for the Redis container.
docker-compose up -d
Implementing a Simple Celery Task
Now that we have all the containers up and running, we should take a moment to implement a proof-of-concept task.
We'll add a new file named tasks.py to the multiple_queues_app/ directory and implement a simple async task. The task will compute the Nth Fibonacci number, albeit in a very inefficient recursive fashion.
from celery import shared_task
@shared_task
def fibonacci(n):
if n <= 1:
return n
else:
return fibonacci(n - 1) + fibonacci(n - 2)
Now we need a view that we can call to invoke this task and process it in the background. Add the following code in the multiple_queues_app/views.py file.
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from .tasks import fibonacci
@csrf_exempt
def calculate_fibonacci(request):
if request.method == "POST":
try:
N = int(request.POST.get("N"))
result = fibonacci.delay(N)
return JsonResponse(
{"task_id": result.task_id, "status": "Task submitted successfully!"},
status=200,
)
except ValueError:
return JsonResponse({"error": "Invalid N value provided."}, status=400)
else:
return JsonResponse({"error": "Only POST method is allowed."}, status=405)
Of course, we're almost there, but we need to add a urls.py to the app, and update the main project urls.py as well before this view will respond.
from django.urls import path
from . import views
urlpatterns = [
path("fibonacci/", views.calculate_fibonacci, name="calculate_fibonacci"),
]
from django.contrib import admin
from django.urls import include, path
urlpatterns = [
path("admin/", admin.site.urls),
path("", include("multiple_queues_app.urls")),
]
With the view and task in place, we can now give this all a try. First issue a docker-compose restart so that the Celery worker code is updated. Now we can use any tool of our choice to test the view, but I'll use curl to make a request.
curl -X POST -d "N=25" http://localhost:8000/fibonacci/
Watching the logs of the Celery worker, we should see the task be enqueued and take about 3 seconds to execute.
Using Watchdog for Celery Worker Hot Reload
You may have noticed that we had to restart the Celery worker manually before we could try out the new task. Manually restarting the worker is going to become cumbersome very quickly. Ideally, we could have functionality similar to hot reload in the Django dev server.
Luckily, the watchdog utility provides an easy way to restart a process when a directory changes. Let's modify the docker-compose.yml file to use the watchmedo script of the watchdog utility, in the command section for Celery.
celery:
build:
context: .
dockerfile: Dockerfile
container_name: celery_worker
command: watchmedo auto-restart --directory=multiple_queues_app --pattern=*.py --recursive -- celery -A myapp worker --loglevel=info
volumes:
- ${PWD}:/usr/src/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on:
- redis
Before updating the containers, however, we first need to add a requirements-dev.txt file and modify the Dockerfile to install from it.
FROM python:3.9-slim-buster
ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE 1
RUN apt-get update && apt-get install -y \
libpq-dev \
&& apt-get -y clean
WORKDIR /usr/src/app
COPY ./requirements.txt /usr/src/app
COPY ./requirements-dev.txt /usr/src/app
RUN pip install --upgrade pip && pip install -r requirements.txt && pip install -r requirements-dev.txt
COPY . /usr/src/app/
With the containers rebuilt and restarted, you should now be able to modify Python files and see the Celery worker restart automatically.
Adding Multiple Queues in Local Development
How can we update this local development setup to include multiple Celery queues?
The process is as simple as adding another service to the docker-compose.yml file. The problem here, however, is that your Compose file can start to become long and full of duplication. Instead of copying the entire Celery service block over and over, we can use some templating magic to keep things DRY.
Below is the updated docker-compose.yml with a new Celery worker for a specific queue added.
version: '3.8'
x-worker-opts: &worker-opts
build:
context: .
dockerfile: Dockerfile
volumes:
- ${PWD}:/usr/src/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on:
- redis
services:
web:
build:
context: .
dockerfile: Dockerfile
container_name: django_dev
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ${PWD}:/usr/src/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
ports:
- "8000:8000"
depends_on:
- redis
fibonacci-worker:
command: tools/start_celery.sh -Q fibonacci --concurrency=1
<<: *worker-opts
prime-worker:
command: tools/start_celery.sh -Q prime --concurrency=1
<<: *worker-opts
redis:
image: redis:latest
container_name: redis
ports:
- "6379:6379"
We also need to tweak the Python task code as well. Previously, all tasks were sent to a single worker that handled everything. Now that we're dividing work into separate queues, we need to specify which queue should handle a task.
from celery import shared_task
@shared_task(queue="fibonacci")
def fibonacci(n):
if n <= 1:
return n
else:
return fibonacci(n - 1) + fibonacci(n - 2)
@shared_task(queue="prime")
def nth_prime(n):
count = 0
num = 1
while count < n:
num += 1
if num < 2:
continue
if any(num % i == 0 for i in range(2, int(num**0.5) + 1)):
continue
count += 1
return num
Production Deployment with Django and Celery
There are many, many ways to deploy Celery workers to a production environment, but in this article I'll focus on a relatively simple technique. For this article we'll go through the process of deploying with a combination of GitHub Actions and Ansible.
Ansible is a reliable and time tested framework for applying configuration to remote machines over SSH. Users define playbooks that configure applications and services on remote machines. Ansible takes these playbooks and checks that the target machine matches the desired configuration, installing and updating whatever is necessary to bring the machine up to date.
You might ask why not use Docker and Compose for the production server as well? That can be a legitimate choice, but a tool like Ansible makes it possible to deploy your application across multiple machines. Doing the same with Docker most likely means using a tool like Kubernetes or Nomad, which increases the complexity level significantly.
Because Ansible works through SSH, we first need to grant the tool access to the target machine.
In a real production scenario, you would probably want to use a virtual machine image that has a specific Ansible user already provisioned. For testing purposes, I've simply started a Digital Ocean droplet with a root user. We'll generate a new SSH key pair locally and then copy the public key to the target machine. The private key will reside on GitHub as a secret.
ssh-keygen -f ansible-key
ssh-copy-id -i ansible-key.pub root@24.199.126.163
You should of course change the IP address to that of a machine you control. The ssh-copy-id program assumes that your local user can already log in with a valid SSH key or password. The point here is to set up a newly generated key-pair that only Ansible will use.
Now that the public key has been uploaded to the target machine, we'll store the private key on GitHub so that Ansible can log in while running inside an Actions pipeline. From the command above, we named the key-pair as ansible-key and ansible-key.pub -- we need to upload the private (i.e. non .pub) key to GitHub.
Using a GitHub Actions Workflow to Trigger Ansible
Before diving into the Ansible itself, we'll need something to trigger and execute the playbook. GitHub Actions are a convenient tool for this purpose. For this simple tutorial, we'll run the Ansible playbook on every push to the main branch. More robust setups could use the branch name to determine whether to deploy to a staging environment or to production.
For now, however, here is a simple deploy.yml that initiates the Ansible playbook. It also demonstrates how to pass GitHub secrets into Ansible.
name: Deploy
on:
push:
branches:
- main
jobs:
update_celery:
runs-on: ubuntu-latest
environment: production
container: willhallonline/ansible:alpine
steps:
- name: Checkout repository
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Set up SSH key
working-directory: ansible
env:
ANSIBLE_SSH_PRIVATE_KEY: ${{ secrets.ANSIBLE_SSH_PRIVATE_KEY }}
run: |
echo "$ANSIBLE_SSH_PRIVATE_KEY" > id_rsa
chmod 600 id_rsa
- name: Run Ansible playbook
working-directory: ansible
run: ansible-playbook -i inventory.ini playbook.yml --tags "app,redis,celery" --extra-vars "redis_password=${{ secrets.REDIS_PASSWORD }}"
Using Systemd to Manage Celery Workers
Within our Ansible playbook we'll have a few tasks responsible for setting up Celery workers and keeping the code updated. I find it useful to create an ansible directory to store all of this configuration.
Here's an overview of the directory structure, to give an idea of what's involved with Ansible in our project.
ansible
├── ansible.cfg
├── inventory.ini
├── playbook.yml
└── roles
├── app
│ ├── tasks
│ │ └── main.yml
│ └── templates
│ ├── gunicorn.conf.py.j2
│ ├── gunicorn-run-dir.service.j2
│ └── gunicorn.service.j2
├── celery
│ ├── tasks
│ │ └── main.yml
│ └── templates
│ ├── celery-beat.service.j2
│ └── celery.service.j2
└── redis
└── tasks
└── main.yml
There are two primary tasks, one of which does the initial setup and installation of Redis, and another to configure the Celery workers. Since this approach only uses Docker for local development, we'll use the systemd service to orchestrate Celery. If you've never used systemd, don't worry, it won't be too complicated. Using systemd is an easy way to ensure that Celery is brought back up if the VM should restart or if the worker should die.
We'll have a separate systemd service for each worker, and one for the celery-beat instance that's used for scheduled tasks. I won't show both config files, as they are very similar, but below is the systemd unit file for the Celery worker.
[Unit]
Description=Celery Worker {{ item.name }} for Multiple Queues App
After=network.target
[Service]
Environment="CELERY_BROKER_URL=redis://:{{ redis_password }}@localhost:6379/0"
Environment="PYTHONUNBUFFERED=1"
Type=simple
User=root
Group=root
WorkingDirectory={{ app_path }}
ExecStart={{ app_path }}/.venv/bin/celery -A multiple_queues_app worker -Q {{ item.name }} --loglevel=info
Restart=always
RestartSec=10
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=celery-{{ item.name }}
[Install]
WantedBy=multi-user.target
You'll notice that the template contains Jinja tags. This will allow us to write a unit file for each separate Celery queue, keeping messy duplication to a minimum as we expand the number of queues.
Defining a new Celery queue is done at the playbook level.
- hosts: production
become: yes
vars:
app_repo: git@github.com:zchtodd/multiple_queues_app.git
app_path: /opt/multiple_queues_app
venv_path: /opt/multiple_queues_app/.venv
gunicorn_config_path: /etc/systemd/system/gunicorn.service
run_dir_maker_config_path: /etc/systemd/system/gunicorn-run-dir.service
celery_workers:
- name: fibonacci
To create a new type of queue, you can add the name to the celery_workers list and Ansible will take care of writing the config files via the templating system.
Deploying Celery with Ansible
The next step is to use Ansible to create the Celery worker services. This process is fairly straightforward and just involves writing out the systemd config files that we saw earlier. A separate Ansible role handles cloning the app code and starting the web server process. You can find all of the details in the project GitHub repository.
- name: Create Celery logs directories
ansible.builtin.file:
path: "/var/log/celery/{{ item.name }}"
state: directory
loop: "{{ celery_workers }}"
- name: Create Celery Beat Systemd service file
ansible.builtin.template:
src: "{{ playbook_dir }}/roles/celery/templates/celery-beat.service.j2"
dest: "/etc/systemd/system/celery-beat.service"
mode: "0644"
- name: Create Celery Systemd service files
ansible.builtin.template:
src: "{{ playbook_dir }}/roles/celery/templates/celery.service.j2"
dest: "/etc/systemd/system/celery-{{ item.name }}.service"
mode: "0644"
loop: "{{ celery_workers }}"
- name: Reload Systemd configuration
ansible.builtin.systemd:
daemon_reload: yes
- name: Enable and start Celery services
ansible.builtin.systemd:
name: "celery-{{ item.name }}"
state: started
enabled: yes
loop: "{{ celery_workers }}"
- name: Enable and start Celery beat
ansible.builtin.systemd:
name: "celery-beat"
state: started
enabled: yes
- name: Restart Celery service
ansible.builtin.systemd:
name: "celery-{{ item.name }}"
state: restarted
daemon_reload: yes
loop: "{{ celery_workers }}"
- name: Restart Celery beat
ansible.builtin.systemd:
name: "celery-beat"
state: restarted
daemon_reload: yes
Once the Celery workers have been set up, you can use the journalctl and systemctl programs to monitor and manage the processes.
To tail the logs of the Fibonacci worker, for example, you would execute the following command.
journalctl -u celery-fibonacci.service -f
Multiple Celery Queues Example App
This tutorial doesn't show all of the code, so if you'd like to see the full implementation, the example app is hosted on GitHub.
The full app should hopefully be a complete template that you can expand on.
Happy hacking!
Posted on August 10, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.