Building Microservices With Nameko

maverick11

Ashutosh Narang

Posted on January 9, 2022

Building Microservices With Nameko

In this chapter, we will learn about Nameko and it's capabilities as a microservice framework.

What is Nameko?

Nameko is a framework for building lightweight, highly scalable and fault-tolerant services in Python following a micro-service architecture design.

It comes with built-in support for:

  • RPC over AMQP
  • Asynchronous events (pub-sub) over AMQP

Why Nameko?

Nameko enables you to build a service that can respond to RPC messages, dispatch events on certain actions, and listen to events from other services. It could also have HTTP interfaces for clients that can’t speak AMQP.

Let's create a basic Nameko service and experiment with it capabilities.

Setup Basic Environment

First, you will need Docker installed. We will use Python 3, so make sure you have it installed as well.
To run Nameko, we need the RabbitMQ. It will be responsible for the communication between our Nameko services.

Install

$ pip install nameko
Enter fullscreen mode Exit fullscreen mode

Start A RabbitMQ Container

$ docker run -p 5672:5672 --hostname nameko-rabbitmq rabbitmq:3

Enter fullscreen mode Exit fullscreen mode

Hello World!!

A Nameko service is just a Python class. The class encapsulates the logic in its methods and declares any dependencies as attributes.

Go ahead and create a file called Service.py with the following content:

from nameko.rpc import rpc

class Service:
    name = "service"

    @rpc
    def receive_event(self, event):
        return f"Event Received: {event}"
Enter fullscreen mode Exit fullscreen mode

Let’s run our example. If you got RabbitMQ running, simply run:

$ nameko run Service
Enter fullscreen mode Exit fullscreen mode

Nameko implements automatic service discovery, meaning when calling an RPC method over AMQP, Nameko will try to find the corresponding RabbitMQ service on its own.

To test our service, run the below command in another terminal:

>>> n.rpc.service.receive_event(event={'message': 'Hello World!!'})

Enter fullscreen mode Exit fullscreen mode

When an RPC entrypoint is invoked, a Nameko worker is created. A worker is just a stateless instance of the service class, which makes it inherently thread-safe. The maximum number of workers by default for a service is set to 10.

Read more about Nameko Workers here.

If maximum number of workers is set to 1, then only 1 Nameko worker will execute at a time i.e. it will behave as a regular queue.

Communicate Between 2 Nameko Services

In order to communicate from one Nameko service to another and vice-versa, Nameko provides an RpcProxy construct. Here's how you use it:

from nameko.rpc import rpc, RpcProxy

class SenderService:
    name = "sender_service"
    receiver_service_proxy = RpcProxy("receiver_service")

    @rpc
    def send_event(self, event):
        return self.receiver_service_proxy.receive_event({'message': 'Hello World!!'})

class ReceiverService:
    name = "receiver_service"

    @rpc
    def receive_event(self, event):
        return f"Event Received: {event}"
Enter fullscreen mode Exit fullscreen mode

Communicate Between A Nameko & A Non-Nameko Service

There will be scenarios where we need to call a Nameko service from something that isn’t a Nameko service like an API service or a cron job. Here's how you do it:

from nameko.standalone.rpc import ClusterRpcProxy

AMQP_URI = "pyamqp://user:paswword@hostname"

config = {
    'AMQP_URI': AMQP_URI
}

with ClusterRpcProxy(config) as cluster_rpc:
    cluster_rpc.service.receive_event({'message': 'Hello World!!'})
Enter fullscreen mode Exit fullscreen mode

Concurrency

Nameko is built on top of the eventlet library, which provides concurrency via “greenthreads”.

Greenthreads unlike OS threads, cooperatively yield to each other instead of preemptively being scheduled by the OS. This behaviour proves to be advantageous when a service is I/O heavy.

One greenthread yields control only when it is busy doing I/O, giving another greenthread a chance to execute thereby allowing the service to use shared data structures without the need of using locks and other synchronisation mechanisms.

Let's experiment with Nameko concurrency in practice by modifying the above code:

from time import sleep
from nameko.rpc import rpc

class Service:
    name = "service"

    @rpc
    def receive_event(self, event):
        sleep(5)
        return f"Event Received: {event}"
Enter fullscreen mode Exit fullscreen mode

We are using sleep from the time module, which is a blocking call. However, when running our services using nameko run, nameko will automatically monkey patch blocking calls to non-blocking calls such as sleep(5) i.e. making it async.

The response time of a single RPC call to our service will be 5 seconds. Now, if we make 10 calls in one go to the same RPC, how long will it take to get the response of all 10 calls?

Let's run the following code in a nameko shell:

def time_concurrent_invocations():
    start_time = time.perf_counter()
    responses = []
    num_concurrent_calls = 10
    for i in range(num_concurrent_calls):
        response = n.rpc.service.receive_event({'message': f'Worker {i+1}'})
        responses.append(response)

    for response in responses:
        print(response.result)

    end_time = time.perf_counter()

    print(f'Total Time: {round(end_time-start_time, 3)}')

time_concurrent_invocations()
Enter fullscreen mode Exit fullscreen mode

This example runs in just around five seconds. Each worker will be blocked waiting for the sleep call to finish, but this does not stop another worker to start, implicit yielding in action.

If you change num_concurrent_calls = 20 in the above snippet, the execution will finish in 10 seconds.

Async Pub-Sub

Let's suppose, we now have to do an asynchronous task like sending a notification or uploading a file on cloud.

from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc

class MessageService:

    name = "message_service"

    dispatch = EventDispatcher()


    def time_consuming_function(self, payload):
        self.dispatch("heavy_payload_event", payload)

    @rpc
    def receive_message(self, event):

        if event['payload']:
            self.time_consuming_function(event['payload'])

        print(f'Message Received: {event['message']}')



class TimeConsumingService:
    name = "time_consuming_service"

    @event_handler("message_service", "heavy_payload_event")
    def time_consuming_event_handler(self, payload):
        pass
Enter fullscreen mode Exit fullscreen mode

When receive_message processes an event with a payload, it calls time_consuming_function that utilises the EventDispatcher to process the payload in an asynchronous manner by invoking the time_consuming_event_handler in a separate greenthread. The caller thread here, does not wait for the event handler to return a response, thereby allowing the caller thread to finish it's execution faster and accept further requests.

Scalable

We have been using only one server and running one instance of RabbitMQ. In a production environment, you will want to arbitrarily increase the number of nodes running the service that is getting too many calls.

To simulate service scaling, let's revisit our service from the concurrency section. Open another terminal and run the service as before, using $ nameko run Service. This will start another service instance with the potential to run ten more workers. Now, try running that snippet again with num_concurrent_calls = 20. It should now take five seconds again to run. When there are more than one service instances running, Nameko will round-robin the RPC requests among the available instances.

In fact you can configure these services in such a way that they can run on completely different machines and scale them independently. All you need to do is point these services at the same RabbitMQ broker.

Create a config file with broker URI:

# config.yaml
AMQP_URI: amqp://<rabbitmq-ip>:5672/
Enter fullscreen mode Exit fullscreen mode

Run these services on different machines using:

$ nameko run <nameko_service> --config config.yaml
Enter fullscreen mode Exit fullscreen mode

Fault Tolerant

Nameko is highly roboust and fault tolerant so, it continues operating properly in the event of failure of one or more nodes in the service cluster till at least one healthy node remains functioning.

Try running 3 instances of the Service and execute the test snippet with num_concurrent_calls = 50. As soon as you execute the test snippet, kill one or 2 instances of the Service.The missed messages will be re-routed to healthy node(s), thus avoiding message loss.

This behaviour is due the fact that messages are ack’d after worker execution completes successfully, and if the connection is lost after delivery but before acknowledgement, RabbitMQ will reclaim and redeliver the message.

What Happens If The RabbitMQ Server Dies And There Are Messages Left In The Queue?

Nameko sets delivery_mode=PERSISTENT by default for the queues it creates for RPC over AMQP. This tells RabbitMQ to save the messages to disk. However, there is a short time window when RabbitMQ has accepted a message but, hasn't saved it yet meaning, the persistence guarantees are not strong. To solve this, Nameko uses publisher confirms by default. Confirms have a performance penalty but guarantee that messages aren't lost.

Conclusion

Nameko is designed to help you build systems using micro-services and scale from a single instance of a single service, to a cluster with many instances of many different services.

To learn more about Nameko checkout Nameko Documentation and join the Nameko Discourse.

💖 💪 🙅 🚩
maverick11
Ashutosh Narang

Posted on January 9, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Building Microservices With Nameko
python Building Microservices With Nameko

January 9, 2022

Netflix Conductor Workers in Python
python Netflix Conductor Workers in Python

December 2, 2021