An Introduction to Microservices pt. 5

bernas1104

Bernardo Costa Nascimento

Posted on August 24, 2021

An Introduction to Microservices pt. 5

Table of Contents

  1. What are we trying to solve?
  2. The Solution: Messaging
  3. Show Me the Code
  4. Bibliography

On part 4, we discussed how to implement a load balance. Now, we'll talk about one way that services can collaborate between each other. We'll talk about messaging.

For this part of the series, I'll be using the code on my Github repository as a guide.

What are we trying to solve?

When using the microservices architecture, services need to collaborate with each other. They can do this both synchronously and asynchronously.

The synchronous communication is easy. We can, for example, call the REST API of another service. But how can we communicate asynchronously?

The Solution: Messaging

To communicate asynchronously, we need to use the messaging pattern. It implements asynchronous service communication, and can be done in a few ways:

  1. Notifications - One service sends a message, but doesn't expect an answer and doesn't receive one;
  2. Request/Asynchronous Response - One service sends a message and awaits for a response that will come eventually;
  3. Publish/Subscribe - One service publishes a message to one or more services;
  4. Publish/Asynchronous Response - One service sends a message to one or more services, and awaits for responses that will come eventually.

Benefits of Messaging

  • Loosely runtime coupling since the sending service doesn't need to block, while waiting for a response;
  • Better availability since the broker stores the messages in a buffer until a consumer is ready to process them;
  • Support to a variety of communication patterns.

Drawbacks of Messaging

  • The broker is yet another component to keep track of;
  • Since the communication is critical, the broker must have a high availability.

Before the Code

We need to quickly talk about two concepts: Aggregates and Domain Events. I won't go in much details about neither one, but these will be important for the messaging pattern and, specially, when deciding the set of services of your application.

Aggregates

Chris Richardson defines as "a cluster of domain objects within a boundary that can be treated as a unit." Now, take my example application. It has a "User" domain object, right? The "User" only has a name and an e-mail address. Imagine that we'd like to store the user's home address. We could create a separate class for it, "Address", which would be a part of our "User" domain object.

It doesn't make sense for us to work with the "Address" without its parent object, "User". So, we call our "Address" class a "Value Object". Our Aggregate, then, is composed of the domain object "User" and the value object "Address". We also can call the "User" class our Aggregate Root.

Working with Aggregates makes our domain model easier to understand, since it groups multiple classes as a single entity. It also makes load, update and delete operations clearer. Finally, it helps us enforce invariants.

Domain Events

Domain Events are very closely related to Aggregates. When something important happens to an Aggregate, e.g. a new Aggregate object is created, it might publish a Domain Event to anyone who might be interested.

Here are a few reasons as to why Domain Events can be useful:

  • Maintaining data consistency when using sagas (another microservices pattern);
  • Notifying a service that maintains a replica of the altered data (CQRS, another microservices pattern);
  • Notifying another application;
  • Sending user notifications;
  • Monitoring to check if the application is working as intended;
  • Analyzing events to model user behavior.

There are a few ways to identify domain events. They are usually defined by the application's requirements, but they can also be defined in a process called "event storming".

Show Me the Code

Now that we've talked about Aggregates and Domain Events, let's begin our implementation!

As I said on the beginning, I'll use my example application here. The code contains: an API Gateway, two simple Web API projects and a docker compose file. The first Web API is a simple CRUD for a "User", which has a name and an e-mail. The second Web API doesn't do anything other than receiving messages and logging what was supposed to happen.

Although the example on the repository is complete, I'll pretend as if it wasn't.

Ok, let's begin. We'll first code our consumer's code.

Consumer

On the "Notification.API" folder (on the "notifications-api" project), we'll create a folder called "Consumers". Next, we'll create an "AbstractConsumer" class, which will hold the basic functionality of our messenger consumers:

// AbstractConsumer.cs
public abstract class AbstractConsumer<T> : BackgroundService
{
  protected IServiceProvider _serviceProvider { get; set; }
  protected IConnection _connection { get; set; }
  protected string Queue { get; set; }
  protected IModel _channel { get; set; }
  protected string _exchange { get; set; }
  protected string _routingKey { get; set; }

  public AbstractConsumer(
    IServiceProvider serviceProvider,
    IConfiguration configuration
  )
  {
    _serviceProvider = serviceProvider;
  }

  protected void InitializeEventBus()
  {
    throw new NotImplementedException();
  }

  protected override Task ExecuteAsync(CancellationToken stoppingToken)
  {
    throw new NotImplementedException();
  }

  protected abstract void LogMessageReceived(T message);

  protected abstract Task SendEmail(T message);
}
Enter fullscreen mode Exit fullscreen mode

The properties "_connection", "Queue", "_channel", "_exchange" and "_routingKey" are all related to our messaging pattern, and I'll explain them in a moment. First, let's quickly go over the "_serviceProvider" property.

We won't actually use it on our example, but if we were actually sending an e-mail to some user, we'd probably need an e-mail service. The "_serviceProvider" would be, then, used to retrieve an instance of our e-mail service to send that e-mail!

Back to our messenger properties. They're all pretty self explanatory: "_connection" will provide our connection to the RabbitMQ server, "Queue" will hold the consumer's queue name, "_channel" will hold our RabbitMQ communication channel, "_exchange" will hold the name of the messenger exchange and, finally, "_routingKey" will hold the routing key of that consumer.

The "InitializeEventBus" method will prepare our consumer to connect to the RabbitMQ server and, well, consume the messages that it's interested in.

// AbstractConsumer.cs
protected void InitializeEventBus()
{
    var connectionFactory = new ConnectionFactory
    {
        HostName = "localhost",
        Port = 5672,
        UserName = "your-user",
        Password = "your-password"
    };

    _connection = connectionFactory.CreateConnection(
        "notifications-service-consumer"
    );

    _channel = _connection.CreateModel();
    _channel.ExchangeDeclare(
        exchange: _exchange,
        type: ExchangeType.Topic,
        durable: false,
        autoDelete: false,
        arguments: null
    );
    _channel.QueueDeclare(
        queue: Queue,
        durable: false,
        exclusive: false,
        autoDelete: false,
        arguments: null
    );
    _channel.QueueBind(
        queue: Queue,
        exchange: _exchange,
        routingKey: _routingKey
    );
}
Enter fullscreen mode Exit fullscreen mode

I'm not going to explain every argument of the called methods. If you want to better understand each of them, take the RabbitMQ tutorial. It's pretty simple and will give you all the information you need!

So, first we open a connection to our RabbitMQ server, then we open the communication channel. After that, we declare our exchange - if it doesn't exist already, it'll be created -, declare our queue - same effect as the exchange declaration - and, finally, we'll bind the queue to the exchange and define its routing key.

Our exchange will act as a router, right? We can make every message go through a specific exchange and use a routing key to determine which queues will receive which messages. That way, the exchange will forward each message to its queue.

Next, we need to actually consume the messages. We'll implement our "ExecuteAsync" method.

// AbstractConsumer.cs
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
    var consumer = new EventingBasicConsumer(_channel);
    consumer.Received += async (sender, eventArgs) =>
    {
        var contentArray = eventArgs.Body.ToArray();

        var contentString = Encoding.UTF8.GetString(contentArray);

        var message = JsonConvert
            .DeserializeObject<T>(
                contentString
            );

        await SendEmail(message);

        _channel.BasicAck(eventArgs.DeliveryTag, false);
    };

    _channel.BasicConsume(
        queue: Queue,
        autoAck: false,
        consumer: consumer
    );

    return Task.CompletedTask;
}
Enter fullscreen mode Exit fullscreen mode

Our consumer's "ExecuteAsync" method will listen to our communication channel and define a callback function that will be executed when a new message is available. The callback will read the message as a byte array, which we'll use to deserialize it to our "T" object. With the message deserialized, we'll use its contents to send an e-mail and execute an ACK, so that the message can be deleted from the queue.

Finally, we'll tell our channel what queue our consumer will be listening to and return the task as completed.

We can, now, create specific consumers for our messages. In my example application, I created two consumers: UserCreated and UserUpdated. Both classes inherit from our AbstractConsumer class and we only need to implement the "SendEmail" method, which can be different to each class. For example, they can use different templates, so we can create a specific implementation.

Producer

Moving on to our Producer, i.e. the "users-api", we'll begin by creating our connection to the RabbitMQ server. On the "Infra" project, we'll create a folder called "MessageBus" and "RabbitMQClient" class, which implements the "IMessageBusClient" that defines the "Publish" method:

// RabbitMQClient.cs
public class RabbitMQClient : IMessageBusClient
{
    private readonly IConnection _connection;

    public RabbitMQClient()
    {
        var connectionFactory = new ConnectionFactory {
            HostName = "localhost",
            UserName = "root",
            Password = "123456"
        };

        _connection = connectionFactory.CreateConnection(
            "users-service-producer"
        );
    }

    public void Publish(object message, string routingKey, string exchange)
    {
        var channel = _connection.CreateModel();

        var settings = new JsonSerializerSettings
        {
            NullValueHandling = NullValueHandling.Ignore,
            ContractResolver = new CamelCasePropertyNamesContractResolver()
        };

        var payload = JsonConvert.SerializeObject(message, settings);

        Console.WriteLine(payload);

        var body = Encoding.UTF8.GetBytes(payload);

        channel.ExchangeDeclare(
            exchange: exchange,
            type: ExchangeType.Topic,
            durable: false
        );

        Console.WriteLine($"{exchange}->{routingKey}");

        channel.BasicPublish(
            exchange: exchange,
            routingKey: routingKey,
            basicProperties: null,
            body: body
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

As you can see, this code is very similar to our Consumer's code. The basic difference is that we'll be taking a given object, serialize it, convert it to an array of bytes and publish it. We'll publish the message in a specified exchange, with a specific routing key.

Moving on, we'll now apply the Aggregates and Domain Events concepts. We'll begin by creating an "AggregateRoot" class on our "Domain" project, on the "Entities" folder:

// AggregateRoot.cs
public class AggregateRoot
{
    private readonly List<IDomainEvent> _events = new List<IDomainEvent>();
    public Guid Id { get; set; }
    public IEnumerable<IDomainEvent> Events => _events;

    protected void AddEvent(IDomainEvent @event)
    {
        _events.Add(@event);
    }
}
Enter fullscreen mode Exit fullscreen mode

The "AggregateRoot" will contain an array of "IDomainEvents" (Domain/Events/Interfaces), which is an empty interface that creates a common contract for our events. Our "User" class will inherit from the "AggregateRoot".

// User.cs
public class User : AggregateRoot
{
  // ...
}
Enter fullscreen mode Exit fullscreen mode

Now, we need to define a Domain Event:

// UserCreated.cs
public class UserCreated : IDomainEvent
{
    public Guid Id { get; private set; }
    public string FullName { get; private set; }
    public string Email { get; private set; }
    public DateTime CreatedAt { get; private set; }

    public UserCreated(Guid id, string fullName, string email)
    {
        Id = id;
        FullName = fullName;
        Email = email;
        CreatedAt = DateTime.Now;
    }
}
Enter fullscreen mode Exit fullscreen mode

As you can see, the "UserCreated" domain event doesn't do much. It only stores the event information. So, how can we publish this? On our "UserService.cs" class, we'll inject our "RabbitMQClient". By doing this, we can access the publish method we created earlier.

Now, when we create a user, we'll need a method that creates and adds the event to the array we defined earlier. After we ensure that the user was created on the database, we'll iterate over the events array and publish our message!

// User.cs
public static User Create(string fullName, string email)
{
    var user = new User(Guid.NewGuid(), fullName, email);

    user.AddEvent(new UserCreated(user.Id, user.FullName, user.Email));

    return user;
}

// UserService.cs
public UsersService(
    // ...
    IMessageBusClient messageBus,
)
{
    // ...
    _messageBus = messageBus;
}

public Task<UserViewModel> CreateUser(UserInputModel model)
{
    // ...

    var user = User.Create(model.FullName, model.Email);

    user = await _usersRepository.CreateAsync(user);

    foreach(var @event in user.Events)
    {
        var routingKey = "user-created";

        _messageBus.Publish(
            message: @event,
            routingKey: routingKey,
            exchange: "user-service"
        );
    }

    // ...
}
Enter fullscreen mode Exit fullscreen mode

And that's it! We've just created our "UserCreated" event and published it to our RabbitMQ server. If everything is right, we should be able to see on the RabbitMQ GUI that a message was, in fact, published and consumed. To make it even easier to see, you can add a "LogMessageReceived" method to your Consumer!

We are almost done with the series. Next - and last - time, we'll be covering the Circuit Breaker pattern. Until next time!

Bibliography

  1. Microservices Patterns: Examples with Java - Chris Richardson
  2. Messaging pattern - https://microservices.io/patterns/communication-style/messaging.html
  3. RabbitMQ - Get Started - https://www.rabbitmq.com/getstarted.html
💖 💪 🙅 🚩
bernas1104
Bernardo Costa Nascimento

Posted on August 24, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

An Introduction to Microservices pt. 5
architecture An Introduction to Microservices pt. 5

August 24, 2021