aishahsofea
Posted on September 17, 2021
Background
I was trying to create something similar to CoinGecko/CoinMarketCap where a list of coins with its current price will be displayed. In the first iteration, I fetched the current price via an HTTP request, and while this works, it is not the best and can be rather inconvenient. Cryptocurrency is known for its volatility, so if the user stays idle for a couple of minutes, the price displayed will quickly be outdated. They can only get the updated price after refreshing the page. On average, the API call takes around 600 to 1000ms. Imagine having to wait 1 second every time you want to get an updated Bitcoin price on top of having to refresh the page. That's pretty annoying, isn't it? So how do we go about solving this? Let's dive right in.
Tech stack
- Django as the web framework
- React for the UI stuff (the bulk of this article will be on the BE so it's totally fine if you don't know React)
- Celery for asynchronous task execution
- Channels to allow WebSocket communication
- Redis as the message queue
Note: If you are on Windows, chances are you'll encounter an issue when running Celery since it does not officially support Windows. So I highly recommend you to use WSL instead. It is also easier to install Redis on WSL.
Also note: This tutorial assumes that you already have some experience with Django.
Setup and installation
Set up Django and React boilerplate.
- Create a folder for our project, let's call it
cmc_clone
. - Assuming you already have Python installed, create a virtual environment inside cmc_clone;
python3 -m venv venv
. Note that I usepython3
, that's because I have both Python 2 & 3 installed on my WSL. - Now you'll see
venv
folder inside your directory. Activate the virtual env like so;source venv/bin/activate
. - Now let's install Django;
pip install django
. Runpip freeze | grep Django
or simplypython -m django --version
to ensure that it's been installed. (Note that I'm now usingpython
instead ofpython3
since I'm already inside the virtual environment) - Once Django has been successfully installed, let's start a Django project called server;
django-admin startproject server
. - Inside server folder, you will see another
server
folder andmanage.py
file. This is the boilerplate that Django automagically creates for us. To make sure that it is indeed working, runpython manage.py runserver
and go tolocalhost:8000
on your browser. You should see a rocket animation and that means your Django server is properly running. - Cool, now let's create an app called coin;
python manage.py startapp coin
on the same level asmanage.py
file. You will see acoin
folder being created, this is what we'll use later for our logic. Make sure to include it underINSTALLED_APPS
. - Let's move on to React. For this we will also use a boilerplate from
create-react-app
. Assuming you have npm installed, let's generate React boilerplate inside cmc_clone folder (same level as venv folder);npx create-react-app client
. This might take a few minutes. - Go inside client folder and run
npm start
. A development server will run atlocalhost:3000
Install Redis and make sure it is properly working.
- Follow this tutorial for Windows 10.
I will structure this article in the sequence of the mistakes that I made. So bear with me.
Getting current price of the coins without the hassle of refreshing the page
So the first problem that we want to solve is getting the updated coin price without having to refresh the page. This is where WebSocket comes in. Unlike HTTP where client needs to send a request each time they want to get a response, WebSocket makes sure that the connection between a client and a server stays open.
As for the coin data, we will use CoinGecko API. There is a bunch of endpoints that you can leverage but for our purpose we are only interested to use the /coin/markets
endpoint.
Let's say we want to get the a price every 30 seconds. Since we don't want to refresh the page, there should be a background process that does this for us, something similar to cron job. Luckily, Celery is pretty good at this.
So there are 2 main parts here:
- Setting up a WebSocket connection
- Execute background tasks
Setting up a WebSocket connection
For this we will use a package called channels. Follow the official installation guide.
Make sure you include channels
in the INSTALLED_APPS
inside settings.py
. Also, under WSGI settings, please add this line ASGI_APPLICATION = 'server.asgi.application'
.
If you encounter any error, please consider upgrading pip.
Then, install the channels_redis package; pip install channels-redis
. lt provides channel layers that use Redis. Visit the Github for more config options.
But for us, we will use the following. Include this inside settings.py
:
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
'hosts': [('127.0.0.1', 6379)]
}
}
}
Now modify asgi.py
so it can handle WebSocket communication.
import os
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from coin.routing import ws_urlpatterns
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
application = ProtocolTypeRouter({
'http': get_asgi_application(),
'websocket': AuthMiddlewareStack(URLRouter(ws_urlpatterns))
})
We have yet to create the ws_urlpatterns
, so don't worry about that for now.
Inside coin
folder, create consumers.py
and routing.py
. You can think of consumers.py
to asgi application as views.py
is to normal Django application, which also has its own routing.
Let's deal with consumers first. Paste the following code inside consumers.py
.
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class CoinListConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.channel_layer.group_add('coin_list', self.channel_name)
await self.accept()
await self.send(json.dumps({'message': 'hey im server'}))
def receive(self, text_data):
print(text_data)
async def disconnect(self):
await self.channel_layer.group_discard('coin_list', self.channel_name)
Here I'm writing the consumer class as asynchronous by extending AsyncWebsocketConsumer
class provided by channels. To understand consumers better, do read the doc.
Now let's create the route for our CoinListConsumer
. Inside routing.py
:
from django.urls import path
from .consumers import CoinListConsumer
ws_urlpatterns = [
path('ws/coin_list/', CoinListConsumer.as_asgi())
]
Note that the ws_urlpatterns
is the one that we imported inside asgi.py
.
Now, let's connect to the WebSocket from the client. Inside /client/src
folder, modify App.js
file.
const socket = new WebSocket("ws://localhost:8000/coin_list/");
function App() {
useEffect(() => {
socket.onmessage = (message) => {
const data = JSON.parse(message.data);
console.log(data);
};
}, []);
const handleButtonClick = () => {
socket.send(
JSON.stringify({
message: "hey im client",
})
);
};
return (
<div className="App">
<button onClick={handleButtonClick}>Send message to the server</button>
</div>
);
}
export default App;
Now you should see a button on React server.
Restart Django server, and you'll see an additional line saying something like Starting ASGI/Channels version 3.0.4 development server at http://127.0.0.1:8000/
. This means that our ASGI is properly configured and the client can now talk to our server via WebSocket.
Open console tab on your browser, and refresh the page. You should see: { "message": "hey im server" }
. It's a JSON that we send from the consumer inside connect()
function.
Try clicking on the button and monitor the Django terminal. You should see something like below:
It's the message that we send from the client using socket.send()
method, and received by the consumer as a text_data
in the receive()
function.
Great, WebSocket is working. Let's make it more exciting by integrating Celery beat.
Execute scheduled background tasks with Celery beat
First, we'll install Celery. In our case the background task that we want to execute is the API call and we'd like to use Redis for our message broker. So make sure to also install all the required packages; pip install celery requests redis
On the same level as settings.py
, create a file called celery.py
. This is where we will configure Celery. You can visit this link for the explanation on the configuration. Inside celery.py
, paste the following:
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'server.settings')
app = Celery('server')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
And then, make sure to import the app inside __init__.py
. Next, create tasks.py
and paste the code below:
import requests
from asgiref.sync import async_to_sync
from celery import shared_task
from channels.layers import get_channel_layer
channel_layer = get_channel_layer()
def get_market_api(page=1, per_page=100, currency='usd'):
market_api = f'https://api.coingecko.com/api/v3/coins/markets?vs_currency={currency}&page={page}&per_page={per_page}'
return market_api
@shared_task
def get_coin_list():
data = requests.get(get_market_api(1, 100, 'usd')).json()
async_to_sync(channel_layer.group_send)(
'coin_list', {
'type': 'send_coin_list',
'coin_list': data
}
)
get_coin_list()
is where we do the API call, and since we want to schedule this task, we're using the shared_task
decorator so that it can be used anywhere in the project. Note that we use async_to_sync()
function, and this is because get_coin_list()
is a synchronous function but we would like to send the data to an asynchronous function inside consumers; send_coin_list()
. Speaking of which, we haven't actually created the said function. Inside the CoinListConsumer
class, just add the following.
async def send_coin_list(self, event):
coin_list = event['coin_list']
await self.send(json.dumps(coin_list))
Remember that we want to schedule the API call so that it will be executed every 30 seconds. Inside celery.py
, add the following code right before app.autodiscover_tasks()
.
app.conf.beat_schedule = {
'get_coin_list_30s': {
'task': 'coin.tasks.get_coin_list',
'schedule': 30.0
}
}
We want to use Redis as our message broker, so add the following inside settings.py
:
CELERY_BROKER_URL = 'redis://localhost:6379'
Before integrating with the front-end, let's make sure the scheduler is actually working. Open new terminals and activate the same python virtual environment. Inside the first terminal, run celery -A server beat -l INFO
and in the second one, run celery -A server worker -l INFO --pool=solo
. If you're on linux you may omit the pool argument. Once celery beat has started, take note of the configuration, right now it is using PersistentScheduler as the scheduler. This information will be useful later.
So what is happening here is that celery beat will send the task to Redis message broker every 30 seconds, and celery worker will check the queue and execute the first item in it.
The beat terminal should look as follows:
[2021-09-17 15:34:04,540: INFO/MainProcess] Scheduler: Sending due task get_coin_list_30s (coin.tasks.get_coin_list)
and worker terminal should look as follows:
[2021-09-17 15:34:06,041: INFO/MainProcess] celery@aishahsofea ready.
[2021-09-17 15:34:06,051: INFO/MainProcess] Task coin.tasks.get_coin_list[e211f23a-63a3-498d-9068-845beaf6c0e1] received
[2021-09-17 15:34:07,676: INFO/MainProcess] Task coin.tasks.get_coin_list[e211f23a-63a3-498d-9068-845beaf6c0e1] succeeded in 1.6234161000029417s: None
Let's display the scheduled data in the UI. Modify App.js as follows:
import { useEffect, useState } from "react";
const socket = new WebSocket("ws://localhost:8000/coin_list/");
function App() {
const [coins, setCoins] = useState([]);
useEffect(() => {
socket.onmessage = (message) => {
const data = JSON.parse(message.data);
setCoins(data["coin_list"]);
};
}, []);
return (
<div className="App">
<ol>
{coins
? coins.map((coin) => (
<li>
{coin.name} | {coin.current_price} USD
</li>
))
: null}
</ol>
</div>
);
}
export default App;
A list of coins will be displayed along with their price, and at least every 30 seconds the price will be updated. But this really depends on the data sent by the API. If you want to confirm, just console the parsed message. Alright, this is perfect, we can now get a real time price without having to refresh our page. But what if we need to change the currency? The task scheduler needs to be disrupted and the current task needs to be replaced with a new task that will call the API based on the currency that we choose. This is a going to be a problem, because recall that we are using PersistentScheduler, and we cannot simply change the tasks at runtime. So let's use a DatabaseScheduler where we can manage our tasks with a database table. However this does not come with Celery, we are going to have to install an extension package; pip install django_celery_beat
. And once it's done installing, include it under INSTALLED_APPS
. Since we want to use the table, don't forget to migrate it; python manage.py migrate
Now let's add a simple dropdown to allow user to select a currency. Add the following JSX under App class.
<label for="currency">Switch currency:</label>
<select name="currency" id="currency" onChange={handleCurrency}>
<option value="usd">US Dollars</option>
<option value="eur">Euro</option>
<option value="myr">Malaysian Ringgit</option>
<option value="btc">Bitcoin</option>
</select>
as well as the handleCurrency
function:
const handleCurrency = () => {
const currency = document.getElementById("currency").value;
socket.send(
JSON.stringify({
currency: currency,
})
);
};
Modify consumers.py
so it can receive the selected currency.
import json
from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from django_celery_beat.models import IntervalSchedule, PeriodicTask
from .tasks import get_coin_list
class CoinListConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.channel_layer.group_add('coin_list', self.channel_name)
await self.accept()
await self.send(json.dumps({'message': 'hey im server'}))
@sync_to_async
async def receive(self, text_data):
text_data_json = json.loads(text_data)
currency = text_data_json['currency']
get_coin_list.delay(currency)
schedule = IntervalSchedule.objects.create(every=30, period=IntervalSchedule.SECONDS)
try:
data = PeriodicTask.objects.get(name='Get coin list')
except PeriodicTask.DoesNotExist:
data = None
if data is None:
PeriodicTask.objects.create(
interval=schedule,
name='Get coin list',
task='coin.tasks.get_coin_list',
args=json.dumps([currency]),
)
else:
PeriodicTask.objects.filter(name='Get coin list').update(args=json.dumps([currency]))
async def disconnect(self):
await self.channel_layer.group_discard('coin_list', self.channel_name)
Inside the receive method, we captured the currency sent from the client. And then we create an IntervalSchedule object. We're going to name our task as 'Get coin list'
. So first we look the name up inside the PeriodicTask table. If it's not in the table, create one and set the currency as the arguments. If it already exists, simply update the argument. Since task scheduling is now handled here, we can remove the one we configured inside celery.py
.
Also get_coin_list
method in tasks.py
should receive currency argument like so:
For the sake of brevity, I omitted the rest of the code. View the complete code here.
def get_coin_list(currency='usd'):
data = requests.get(get_market_api(1, 100, currency)).json()
The celery beat needs to be restarted, but the command would be slightly different now that we want to use the database scheduler.
celery -A server beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
.
Restart the worker as well and the command should be the same as before. Refresh the UI and select any currency and you should see the price changing accordingly.
References:
- An article on WebSocket.
Posted on September 17, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 20, 2024