Dynamically change scheduled tasks with Django & celery beat

aishahsofea

aishahsofea

Posted on September 17, 2021

Dynamically change scheduled tasks with Django & celery beat

Background

I was trying to create something similar to CoinGecko/CoinMarketCap where a list of coins with its current price will be displayed. In the first iteration, I fetched the current price via an HTTP request, and while this works, it is not the best and can be rather inconvenient. Cryptocurrency is known for its volatility, so if the user stays idle for a couple of minutes, the price displayed will quickly be outdated. They can only get the updated price after refreshing the page. On average, the API call takes around 600 to 1000ms. Imagine having to wait 1 second every time you want to get an updated Bitcoin price on top of having to refresh the page. That's pretty annoying, isn't it? So how do we go about solving this? Let's dive right in.

Tech stack

  • Django as the web framework
  • React for the UI stuff (the bulk of this article will be on the BE so it's totally fine if you don't know React)
  • Celery for asynchronous task execution
  • Channels to allow WebSocket communication
  • Redis as the message queue

Note: If you are on Windows, chances are you'll encounter an issue when running Celery since it does not officially support Windows. So I highly recommend you to use WSL instead. It is also easier to install Redis on WSL.

Also note: This tutorial assumes that you already have some experience with Django.

Setup and installation

Set up Django and React boilerplate.
  1. Create a folder for our project, let's call it cmc_clone.
  2. Assuming you already have Python installed, create a virtual environment inside cmc_clone; python3 -m venv venv. Note that I use python3, that's because I have both Python 2 & 3 installed on my WSL.
  3. Now you'll see venv folder inside your directory. Activate the virtual env like so; source venv/bin/activate.
  4. Now let's install Django; pip install django. Run pip freeze | grep Django or simply python -m django --version to ensure that it's been installed. (Note that I'm now using python instead of python3 since I'm already inside the virtual environment)
  5. Once Django has been successfully installed, let's start a Django project called server; django-admin startproject server.
  6. Inside server folder, you will see another server folder and manage.py file. This is the boilerplate that Django automagically creates for us. To make sure that it is indeed working, run python manage.py runserver and go to localhost:8000 on your browser. You should see a rocket animation and that means your Django server is properly running.
  7. Cool, now let's create an app called coin; python manage.py startapp coin on the same level as manage.py file. You will see a coin folder being created, this is what we'll use later for our logic. Make sure to include it under INSTALLED_APPS.
  8. Let's move on to React. For this we will also use a boilerplate from create-react-app. Assuming you have npm installed, let's generate React boilerplate inside cmc_clone folder (same level as venv folder); npx create-react-app client. This might take a few minutes.
  9. Go inside client folder and run npm start. A development server will run at localhost:3000
Install Redis and make sure it is properly working.
  1. Follow this tutorial for Windows 10.

I will structure this article in the sequence of the mistakes that I made. So bear with me.

Getting current price of the coins without the hassle of refreshing the page

So the first problem that we want to solve is getting the updated coin price without having to refresh the page. This is where WebSocket comes in. Unlike HTTP where client needs to send a request each time they want to get a response, WebSocket makes sure that the connection between a client and a server stays open.

As for the coin data, we will use CoinGecko API. There is a bunch of endpoints that you can leverage but for our purpose we are only interested to use the /coin/markets endpoint.

Let's say we want to get the a price every 30 seconds. Since we don't want to refresh the page, there should be a background process that does this for us, something similar to cron job. Luckily, Celery is pretty good at this.

So there are 2 main parts here:

  • Setting up a WebSocket connection
  • Execute background tasks

Setting up a WebSocket connection

For this we will use a package called channels. Follow the official installation guide.
Make sure you include channels in the INSTALLED_APPS inside settings.py. Also, under WSGI settings, please add this line ASGI_APPLICATION = 'server.asgi.application'.
If you encounter any error, please consider upgrading pip.

Then, install the channels_redis package; pip install channels-redis. lt provides channel layers that use Redis. Visit the Github for more config options.
But for us, we will use the following. Include this inside settings.py:

CHANNEL_LAYERS = {
        'default': {
            'BACKEND': 'channels_redis.core.RedisChannelLayer',
            'CONFIG': {
                'hosts': [('127.0.0.1', 6379)]
            }
        }
}
Enter fullscreen mode Exit fullscreen mode

Now modify asgi.py so it can handle WebSocket communication.

import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from coin.routing import ws_urlpatterns
from django.core.asgi import get_asgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')

application = ProtocolTypeRouter({
    'http': get_asgi_application(),
    'websocket': AuthMiddlewareStack(URLRouter(ws_urlpatterns))
})
Enter fullscreen mode Exit fullscreen mode

We have yet to create the ws_urlpatterns, so don't worry about that for now.

Inside coin folder, create consumers.py and routing.py. You can think of consumers.py to asgi application as views.py is to normal Django application, which also has its own routing.

Let's deal with consumers first. Paste the following code inside consumers.py.

import json

from channels.generic.websocket import AsyncWebsocketConsumer

class CoinListConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        await self.channel_layer.group_add('coin_list', self.channel_name)
        await self.accept()
        await self.send(json.dumps({'message': 'hey im server'}))

    def receive(self, text_data):
        print(text_data)

    async def disconnect(self):
        await self.channel_layer.group_discard('coin_list', self.channel_name)

Enter fullscreen mode Exit fullscreen mode

Here I'm writing the consumer class as asynchronous by extending AsyncWebsocketConsumer class provided by channels. To understand consumers better, do read the doc.

Now let's create the route for our CoinListConsumer. Inside routing.py:

from django.urls import path

from .consumers import CoinListConsumer

ws_urlpatterns = [
    path('ws/coin_list/', CoinListConsumer.as_asgi())
]
Enter fullscreen mode Exit fullscreen mode

Note that the ws_urlpatterns is the one that we imported inside asgi.py.

Now, let's connect to the WebSocket from the client. Inside /client/src folder, modify App.js file.

const socket = new WebSocket("ws://localhost:8000/coin_list/");

function App() {

    useEffect(() => {
        socket.onmessage = (message) => {
            const data = JSON.parse(message.data);
            console.log(data);
        };
    }, []);

    const handleButtonClick = () => {
        socket.send(
            JSON.stringify({
                message: "hey im client",
            })
        );
    };

    return (
        <div  className="App">
            <button  onClick={handleButtonClick}>Send message to the server</button>
        </div>
    );
}

export  default App;
Enter fullscreen mode Exit fullscreen mode

Now you should see a button on React server.
Restart Django server, and you'll see an additional line saying something like Starting ASGI/Channels version 3.0.4 development server at http://127.0.0.1:8000/. This means that our ASGI is properly configured and the client can now talk to our server via WebSocket.

Open console tab on your browser, and refresh the page. You should see: { "message": "hey im server" }. It's a JSON that we send from the consumer inside connect() function.

Try clicking on the button and monitor the Django terminal. You should see something like below:

Alt Text
It's the message that we send from the client using socket.send() method, and received by the consumer as a text_data in the receive() function.

Great, WebSocket is working. Let's make it more exciting by integrating Celery beat.

Execute scheduled background tasks with Celery beat

First, we'll install Celery. In our case the background task that we want to execute is the API call and we'd like to use Redis for our message broker. So make sure to also install all the required packages; pip install celery requests redis
On the same level as settings.py, create a file called celery.py. This is where we will configure Celery. You can visit this link for the explanation on the configuration. Inside celery.py, paste the following:

import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'server.settings')

app = Celery('server')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()
Enter fullscreen mode Exit fullscreen mode

And then, make sure to import the app inside __init__.py. Next, create tasks.py and paste the code below:

import requests
from asgiref.sync import async_to_sync
from celery import shared_task
from channels.layers import get_channel_layer

channel_layer = get_channel_layer()

def get_market_api(page=1, per_page=100, currency='usd'):
    market_api = f'https://api.coingecko.com/api/v3/coins/markets?vs_currency={currency}&page={page}&per_page={per_page}'
    return market_api

@shared_task
def get_coin_list():
    data = requests.get(get_market_api(1, 100, 'usd')).json()

    async_to_sync(channel_layer.group_send)(
        'coin_list', {
            'type': 'send_coin_list',
            'coin_list': data
    }
)
Enter fullscreen mode Exit fullscreen mode

get_coin_list() is where we do the API call, and since we want to schedule this task, we're using the shared_task decorator so that it can be used anywhere in the project. Note that we use async_to_sync() function, and this is because get_coin_list() is a synchronous function but we would like to send the data to an asynchronous function inside consumers; send_coin_list(). Speaking of which, we haven't actually created the said function. Inside the CoinListConsumer class, just add the following.

async def send_coin_list(self, event):
    coin_list = event['coin_list']
    await self.send(json.dumps(coin_list))
Enter fullscreen mode Exit fullscreen mode

Remember that we want to schedule the API call so that it will be executed every 30 seconds. Inside celery.py, add the following code right before app.autodiscover_tasks().

app.conf.beat_schedule = {
    'get_coin_list_30s': {
        'task': 'coin.tasks.get_coin_list',
        'schedule': 30.0
    }
}
Enter fullscreen mode Exit fullscreen mode

We want to use Redis as our message broker, so add the following inside settings.py:

CELERY_BROKER_URL = 'redis://localhost:6379'
Enter fullscreen mode Exit fullscreen mode

Before integrating with the front-end, let's make sure the scheduler is actually working. Open new terminals and activate the same python virtual environment. Inside the first terminal, run celery -A server beat -l INFO and in the second one, run celery -A server worker -l INFO --pool=solo. If you're on linux you may omit the pool argument. Once celery beat has started, take note of the configuration, right now it is using PersistentScheduler as the scheduler. This information will be useful later.

So what is happening here is that celery beat will send the task to Redis message broker every 30 seconds, and celery worker will check the queue and execute the first item in it.
The beat terminal should look as follows:

[2021-09-17 15:34:04,540: INFO/MainProcess] Scheduler: Sending due task get_coin_list_30s (coin.tasks.get_coin_list)
Enter fullscreen mode Exit fullscreen mode

and worker terminal should look as follows:

[2021-09-17 15:34:06,041: INFO/MainProcess] celery@aishahsofea ready.
[2021-09-17 15:34:06,051: INFO/MainProcess] Task coin.tasks.get_coin_list[e211f23a-63a3-498d-9068-845beaf6c0e1] received
[2021-09-17 15:34:07,676: INFO/MainProcess] Task coin.tasks.get_coin_list[e211f23a-63a3-498d-9068-845beaf6c0e1] succeeded in 1.6234161000029417s: None
Enter fullscreen mode Exit fullscreen mode

Let's display the scheduled data in the UI. Modify App.js as follows:

import { useEffect, useState } from  "react";

const socket = new WebSocket("ws://localhost:8000/coin_list/");

    function App() {
    const [coins, setCoins] = useState([]);

    useEffect(() => {
        socket.onmessage = (message) => {
            const data = JSON.parse(message.data);
            setCoins(data["coin_list"]);
        };
    }, []);

    return (
        <div  className="App">
            <ol>
                {coins
                    ? coins.map((coin) => (
                        <li>
                            {coin.name} | {coin.current_price} USD
                        </li>
                    ))
                : null}
            </ol>
        </div>
    );
}

export  default App;
Enter fullscreen mode Exit fullscreen mode

A list of coins will be displayed along with their price, and at least every 30 seconds the price will be updated. But this really depends on the data sent by the API. If you want to confirm, just console the parsed message. Alright, this is perfect, we can now get a real time price without having to refresh our page. But what if we need to change the currency? The task scheduler needs to be disrupted and the current task needs to be replaced with a new task that will call the API based on the currency that we choose. This is a going to be a problem, because recall that we are using PersistentScheduler, and we cannot simply change the tasks at runtime. So let's use a DatabaseScheduler where we can manage our tasks with a database table. However this does not come with Celery, we are going to have to install an extension package; pip install django_celery_beat. And once it's done installing, include it under INSTALLED_APPS. Since we want to use the table, don't forget to migrate it; python manage.py migrate

Now let's add a simple dropdown to allow user to select a currency. Add the following JSX under App class.

<label  for="currency">Switch currency:</label>
<select  name="currency"  id="currency"  onChange={handleCurrency}>
    <option  value="usd">US Dollars</option>
    <option  value="eur">Euro</option>
    <option  value="myr">Malaysian Ringgit</option>
    <option  value="btc">Bitcoin</option>
</select>
Enter fullscreen mode Exit fullscreen mode

as well as the handleCurrency function:

const handleCurrency = () => {
    const currency = document.getElementById("currency").value;
    socket.send(
        JSON.stringify({
            currency: currency,
        })
    );
};
Enter fullscreen mode Exit fullscreen mode

Modify consumers.py so it can receive the selected currency.

import json

from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from django_celery_beat.models import IntervalSchedule, PeriodicTask

from .tasks import get_coin_list

class CoinListConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        await self.channel_layer.group_add('coin_list', self.channel_name)
        await self.accept()
        await self.send(json.dumps({'message': 'hey im server'}))

    @sync_to_async
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        currency = text_data_json['currency']
        get_coin_list.delay(currency)

        schedule = IntervalSchedule.objects.create(every=30, period=IntervalSchedule.SECONDS)

        try:
            data = PeriodicTask.objects.get(name='Get coin list')
        except PeriodicTask.DoesNotExist:
            data = None

        if data is  None:
            PeriodicTask.objects.create(
                interval=schedule,
                name='Get coin list',
                task='coin.tasks.get_coin_list',
                args=json.dumps([currency]),
            )
        else:
            PeriodicTask.objects.filter(name='Get coin list').update(args=json.dumps([currency]))

    async def disconnect(self):
        await self.channel_layer.group_discard('coin_list', self.channel_name)

Enter fullscreen mode Exit fullscreen mode

Inside the receive method, we captured the currency sent from the client. And then we create an IntervalSchedule object. We're going to name our task as 'Get coin list'. So first we look the name up inside the PeriodicTask table. If it's not in the table, create one and set the currency as the arguments. If it already exists, simply update the argument. Since task scheduling is now handled here, we can remove the one we configured inside celery.py.

Also get_coin_list method in tasks.py should receive currency argument like so:
For the sake of brevity, I omitted the rest of the code. View the complete code here.

def get_coin_list(currency='usd'):
    data = requests.get(get_market_api(1, 100, currency)).json()
Enter fullscreen mode Exit fullscreen mode

The celery beat needs to be restarted, but the command would be slightly different now that we want to use the database scheduler.

celery -A server beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

.

Restart the worker as well and the command should be the same as before. Refresh the UI and select any currency and you should see the price changing accordingly.

References:

  1. An article on WebSocket.
💖 💪 🙅 🚩
aishahsofea
aishahsofea

Posted on September 17, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related