Web3 backend & smart contract development for Python developers part 15: Listening on-chain events with Celery and web3.py

ilija

ilija

Posted on November 17, 2023

Web3 backend & smart contract development for Python developers part 15: Listening on-chain events with Celery and web3.py

Now it is time to plug blockchain part into Django and database and to put all things together. General flow will go something like this: 1) cypto user order let's say 1 NFT and pay in crypto from his user profile. 2) We will listen emitted events from Polygin Mumbai on our backend and filter the one associated with our MusicNFT contract. If newNFTMinted event is there we will take out ID of newly minted NFT as well as buyer address. Then based on user address we will query our database and assign customer to variable. If NFT ID is not in his collection we will append ID to nft_ids filed and for one increase total_no_of_nft field. With this we will bring in sync database on backend and data in contract storage.

Now, during this toy project a lot of different patterns emerged that most probably you will often confront in your web3 day to day job. And listening for smart contract events on backend and updating database accordingly is definitely one of them. And this can be done on myriad of different ways. Here is one possible solution =>

We will use Celery (task/process) and RabitMQ (message/task broker).

Let's start by installing our message broker rabbitmq-server:

   #install message broker
    $sudo apt install rabbitmq-server

    # enable message broker
    $sudo systemctl enable rabbitmq-server

    # start message broker
    $ sudo systemctl start rabbitmq-server

    IMPORTANT: If you are working from `WSL2` this command will throw error:
    `System has not been booted with systemd as init system (PID 1). Can't operate.
    Failed to connect to bus: Host is down`

    That is why in case of `WSL2` we need to use another command (reasons for this are a bit deep and you can find more in this nice stack exchange "essay" explaining all technical intricacy https://askubuntu.com/questions/1379425/system-has-not-been-booted-with-systemd-as-init-system-pid-1-cant-operate):



 $sudo service rabbitmq-server start

    If everything whent well you should get seomething like this 
    $* Starting RabbitMQ Messaging Server rabbitmq-server 
Enter fullscreen mode Exit fullscreen mode

With this we have broker server up and running. Now we need Celery related staff. Let's go first pip install Celery (don't forget to activate virtual environment)

  $pip install celery
Enter fullscreen mode Exit fullscreen mode

Inside Django project directory ./musical_nft create new file celery.py. Content of file should look something like this:

# celery.py
    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery

    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'musical_nft.settings')

    app = Celery('musical_nft', broker="pyamqp://guest@localhost//")

    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    app.config_from_object('django.conf:settings', namespace='CELERY')

    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
Enter fullscreen mode Exit fullscreen mode

Now in our authentication app folder we should create new tasks.py file

 # tasks.py
    from celery import shared_task
    from web3_interface import contract_interface
    from .models import Customer


    @shared_task
    def event_listener():
        suc, result = contract_interface.event()
        if suc:
            try:        
                c1 = Customer.objects.get(eth_address=result.args.owner)
                if result.args.numberOfNFT not in c1.nft_ids:
                    # update user db
                    c1.nft_ids.append(result.args.numberOfNFT)
                    c1.total_no_of_nfts += 1
                    c1.save()
            except Exception as e:
                print (e)


Enter fullscreen mode Exit fullscreen mode

Then in our Django musical_nft/settings.py we should add Celery relevant setting:

  # Celery settings
    CELERY_BROKER_URL = 'pyamqp://guest@localhost//'
    CELERY_RESULT_BACKEND = 'rpc://'
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_TIMEZONE = 'UTC'
    CELERY_ENABLE_UTC = True
    CELERY_BEAT_SCHEDULE = {
        'event_listener': {
            'task': 'authentication.tasks.event_listener',
            'schedule': 5.0,  # Run every 5 seconds
        },
    }
Enter fullscreen mode Exit fullscreen mode

And then update __init__.py file from musical_nft folder with followinig code:

  from __future__ import absolute_import, unicode_literals

    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app

    __all__ = ('celery_app',)

Enter fullscreen mode Exit fullscreen mode

What we need now is new contract_interface class where we will do all web3 related staff (listening for the newNFTMinted etc.). But first let's update our Django customer model with one additional field for user ETH address (we will use this field to pull correct user from DB based on newNFTMinted arg.owner of person who bought NFT).

Just add to models.py inside authentication app one line (now our Customer models should look something like this):

  from django.contrib.postgres.fields import ArrayField
    from django.db import models


    class Customer(models.Model):
        CRYPTO = "CRYPTO"
        CREDIT = "CREDIT"

        OPTIONS = [(CRYPTO, "cypto buyer"), (CREDIT, "credit card buyer")]

        created_at = models.DateTimeField(auto_now=True)
        first_name = models.CharField(max_length=50, blank=True)
        last_name = models.CharField(max_length=50, blank=True)
        username = models.CharField(max_length=50, blank=True)
        eth_address = models.CharField(max_length=100, blank=True)
        email = models.EmailField(max_length=250, blank=True)
        type = models.CharField(
            max_length=20,
            choices=OPTIONS,
            # default="CRYPTO"
        )
        total_no_of_nfts = models.IntegerField(default=0)
        nft_ids = ArrayField(
            models.IntegerField(null=True, blank=True), default=list
        )

        nft_metadata = models.ManyToManyField(NFTMetadata)

        def test_function():
            return list()

        def __str__(self):
            return f"{self.first_name} {self.last_name}"

Enter fullscreen mode Exit fullscreen mode

Then follow standard procedure: python manage.py makemigrations & python manage.py migrate

Ones this is done we can go to Django shell and add to our user address from which we will buy first NFT (in real life user should have ability on front-end to add or change his eth address. But for test we will do by the hand):

 $py manage.py shell   
    >>>from authentication.models import Customer

    # pull user from database. put your desired pk/id 
    >>> c1 = Customer.objects.get(pk=10)

    # update newly added eth_address field with address from which you will buy NFT
    >>>c1.eth_address = "0xB4A5D329e35F83a2e6AB5a74d56f5bD67EaB8d83"
    >>>c1.save()
Enter fullscreen mode Exit fullscreen mode

With this we have user in database with his eth address. Ones we pull information's from smart contract event we will query our database for user with that address and update number of NFTs he own as well as add newly minted NFT id.

What means that now we need web3 python class which will be part of celery task. Main purpose of this class is to listen for this specific smart contract event (newNFTminted emited ones buyNFT is successfully executed).

We will organize web3 part as new folder in project root directory. Name of folder will be web3_interface. We will transform this folder into Python package (to be imported in our tasks.py file) by adding __init__.py inside web3_interface folder. Inside __init__.py file add following code (explanation in comments):

  from .sc_interface import Interface
    import os
    import json

    abidir = os.path.dirname(os.path.abspath(__file__))

    # functino for smart contract ABI loading
    def load_abi(name):
        filename = os.path.join(abidir, f"{name}.json")
        return json.load(open(filename))

    # calling this function with MusicNFT.json as argument
    musicNFT = load_abi("MusicNFT")

    # instatitatin Interface class (yet to be made) with musicNFT ABI as argument
    contract_interface = Interface(musicNFT["abi"])

    # now contract interface is ready to be used imported and used in our `tasks.py` 
Enter fullscreen mode Exit fullscreen mode

Inside web3_interface you need to copy MusicNFT.json ABI and create sc_interface.py. Content of this file should look something like this (here we have all our web3 related staff)

import os
from web3 import Web3
from web3.middleware import geth_poa_middleware
from web3._utils.events import get_event_data
from eth_abi.codec import ABICodec
from dotenv import load_dotenv


load_dotenv()


class Contracts:
    def __init__(self, musicNFT):
        self.musicNFT = musicNFT
        self.musicNFT_address = os.environ.get("MUSIC_NFT_ADDRESS")
        self.private_key = os.environ.get("SIGNER_PRIVATE_KEY")
        self.w3Provider = Web3(Web3.HTTPProvider(os.environ.get("INFURA_PROVIDER")))
        self.codec: ABICodec = self.w3Provider.codec
        self.w3Provider.middleware_onion.inject(geth_poa_middleware, layer=0)
        self.nft_contract = self.w3Provider.eth.contract(
            address=self.musicNFT_address, abi=self.musicNFT
        )


class Interface(Contracts):
    def __init__(self, musicNFT):
        Contracts.__init__(self, musicNFT)
        self.event_template = self.nft_contract.events.newNFTMinted


    def handle_event(self, event, event_template):
        try:
            result = get_event_data(
                self.codec, event_template._get_event_abi(), event
            )
            return True, result
        except Exception as e:
            print(e)
            return False, None

    def event(self):
        block_number = self.w3Provider.eth.block_number
        events = self.w3Provider.eth.get_logs(
            {
                "fromBlock": block_number - 5,
                "toBlock": "latest",
                "address": self.musicNFT_address,
            }
        )

        try:
            for event in events:
                suc, result = self.handle_event(
                    event=event, event_template=self.event_template
                )
                if suc:
                    return (True, result)
            return False, False
        except:
            return (False, None)
Enter fullscreen mode Exit fullscreen mode

Then our root .env should have following elements:

#database
DENIS_PASS=xxxxxx
ETHEREUM_NETWORK=maticmum
INFURA_PROVIDER=https://polygon-mumbai.infura.io/v3/your_keys
SIGNER_PRIVATE_KEY=private-keys
MUSIC_NFT_ADDRESS=contract_address_mumbai

Enter fullscreen mode Exit fullscreen mode

With this things in place we are ready to run! Open three separate terminals:

From command line turn on our worker:

    $celery -A musical_nft worker -l info

    #And you should get something like this:

    [2023-11-15 14:23:58,390: INFO/MainProcess] mingle: searching for neighbors
    [2023-11-15 14:23:58,398: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
    [2023-11-15 14:23:59,414: INFO/MainProcess] mingle: all alone
    [2023-11-15 14:23:59,443: INFO/MainProcess] celery@Ilija ready.
Enter fullscreen mode Exit fullscreen mode

Then in another temrinal

 $celery -A musical_nft beat --loglevel=info

    # and you should get something like
    celery beat v5.3.4 (emerald-rush) is starting.
__    -    ... __   -        _
    LocalTime -> 2023-11-15 19:19:32
    Configuration ->
        . broker -> amqp://guest:**@localhost:5672//
        . loader -> celery.loaders.app.AppLoader
        . scheduler -> celery.beat.PersistentScheduler
        . db -> celerybeat-schedule
        . logfile -> [stderr]@%INFO
        . maxinterval -> 5.00 minutes (300s)
    [2023-11-15 19:19:32,379: INFO/MainProcess] beat: Starting...
    [2023-11-15 19:19:32,400: INFO/MainProcess] Scheduler: Sending due task my_scheduled_task (authentication.tasks.my_scheduled_task)
Enter fullscreen mode Exit fullscreen mode

And lastly, third terminal window for Django server:

  $py manager.py runserver
    System check identified no issues (0 silenced).
    November 15, 2023 - 19:35:53
    Django version 4.2.7, using settings 'musical_nft.settings'
    Starting development server at http://127.0.0.1:8000/
    Quit the server with CONTROL-C.
Enter fullscreen mode Exit fullscreen mode

In background we should have our Postgres server up and runnig. In any case, a lot of things currently going on in background to make all this happen.

Now let's test this thing:

1) go to front-end and log as denis user (the one to whom you assign eth address)

2) input number 1 and press buy button. MetaMask should pop-up two times: ones to approve MockTokens to MucisNFT contract and second to buyNFT (inside this function we have transferFrom and _mint to caller/user address).

Ones this is done contract will emit newNFTMinted event. Then our event_listener class will pick up this event and read two main piece of information's: buyers address and ID of newly minted MusicNFT. Then we will use user address to query our backend for that user and check if this NFT id is already in his collection. If no, we will append this new ID as well as increment for one total number of NFTs user have. This information will latter be used to update user profile on front-end.

And with this we have crypto buyer finished. Before we make deployment of this app somewhere in cloud, we have final step to do: add Stripe integrations for credit card buyers! This will be topic of next blog....

Code can be found in this repo

💖 💪 🙅 🚩
ilija
ilija

Posted on November 17, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related