Using Dapr for Microservice Communication

jeastham1993

James Eastham

Posted on December 16, 2019

Using Dapr for Microservice Communication

Getting started

Last week I wrote an article discussing the different faces of microservice communication.

One of my biggest gripes when building microservices is the knowledge each service must-have of any other service it needs data from (event bus-based communication aside).

Take REST as an example. For the order-service to make a GET request to the product-service it must have some knowledge about where to send that request. https://product-service/api/v1/validation for example.

It's for this reason, I've always tended to gravitate towards event-based communication. It's the most decoupled as far as knowledge is concerned, but also one of the more involved to set up compared to gRPC and REST ('dotnet new webapi' anyone).

Event-based communication gives a single point of failure though. An entire app structured entirely on an event bus comes tumbling down when the event bus stops functioning. That feels quite coupled to me...

I was recently listening to the Azure Dev Ops podcast and specifically the interview with Mark Fussell talking about Dapr.

On the mention of the word Dapr, my mind instantly jumps to ORM's and database-y things. However, I soon realized this is a whole different ball game.

Dapr

So what is Dapr? To pluck a description straight from https://dapr.io/ - Dapr is "An event-driven, portable runtime for building microservices on cloud and edge.".

Not overly helpful as far as day to day development goes, but after a dive into the samples I soon realized what a powerful tool it could be.

How it works

Dapr runs as a sidecar process to your main service process. What this means is that for every application you have running, each one has it's own little Dapr application running next to it.

If, for example, you needed to make a request from the order-service to the product-service instead of making the call to https://product-service/api/v1/validation it would be made to http://localhost:51987/v1.0/invoke/productservice/method/validate. note: port 5987 is randomly assigned by Dapr at runtime, it may change.

The same applies to raise events. Instead of needing to write specific implementations for the event bus that the app will use. Publishing an event through Dapr is as simple as making a call to http://localhost:51987/v1.0/publish/myeventtopic.

Need to manage some state in your app... that is covered as well. Instead of writing the, albeit straightforward, code to store key-value pairs in redis a GET/POST request can be made to http://localhost:51987/v1.0/state to retrieve/store data.

The underlying tech used by the eventbus/state store is managed by a couple of very simple YAML files that store the required setup. By default, this is always Redis.


apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""


Enter fullscreen mode Exit fullscreen mode

Dapr, as far as I have seen, is a way of completely abstracting away the need for the low-level code that we as developers write over and over again. Instead moving that into configuration files and allowing the focus of development to be on business logic instead of getting bogged down with state and event bus providers.

It also allows teams of developers within the same company to work with completely different languages, yet keeping a common way of interacting between themselves.

Sound great? Let's dive into some examples.

Code samples

For the purpose of this article, all the code will be written in dotnet core 3.1. I would highly recommend taking a look at the samples repo on github (especially the distributed calculator, it's fab).

Project Setup

First off, I will create three new dotnet core web API projects.

dotnet new webapi -n Dapr.OrderService
dotnet new webapi -n Dapr.ProductService
dotnet new webapi -n Dapr.DispatchService
Enter fullscreen mode Exit fullscreen mode

In all, I'll delete all of the boilerplate code, and then add a couple of nuget references

dotnet add package MediatR
dotnet add package MediatR.Extensions.Microsoft.DependencyInjection
dotnet add package Dapr.AspNetCore --version 0.3.0-preview01
Enter fullscreen mode Exit fullscreen mode

I then updated all of the Startup.cs files to add both MediatR and Dapr

public void ConfigureServices(IServiceCollection services)
{
    services.AddMediatR(typeof(Startup).Assembly);
    services.AddControllers().AddDapr();

    services.AddSingleton(new JsonSerializerOptions()
    {
        PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
        PropertyNameCaseInsensitive = true,
    });
}
Enter fullscreen mode Exit fullscreen mode

As far as my functionality goes, I want to be able to create a new Order that will contain a list of lines. For each of the lines, the requisite product code needs to be validated by the product service.

If all product lines are valid then the order should be persisted to the order store, and a NewOrder event should be raised.

The dispatch service will listen for NewOrder events and start the dispatching process (in this case, write the order contents to a text file).

Synchronous Requests

First, I need to handle the initial order creation and validation. For that, I will create a new order class and a controller endpoint to handle a POST request.

using System;
using System.Collections.Generic;

namespace Dapr.OrderService.Domain.Models
{
    public class Order
    {
        public string OrderNumber { get; set; }

        public DateTime OrderDate { get; set; }

        public IEnumerable<OrderLine> OrderLines { get; set; }
    }

    public class OrderLine
    {
        public string ProductCode { get; set; }

        public int Quantity { get; set; }
    }
}
Enter fullscreen mode Exit fullscreen mode
using System.Threading.Tasks;
using Dapr.OrderService.Domain.Commands;
using Dapr.OrderService.Domain.Models;
using MediatR;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;

namespace Dapr.OrderService.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class OrderController : ControllerBase
    {
        private readonly ILogger<OrderController> _logger;
        private readonly IMediator _mediator;

        public OrderController(ILogger<OrderController> logger
            ,IMediator mediator)
        {
            _logger = logger;
            _mediator = mediator;
        }

        [HttpPost]
        public async Task<Order> CreateOrderAsync([FromBody] CreateOrderCommand createOrderCommand)
        {
            return await this._mediator.Send(createOrderCommand);
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Once we have the endpoint available, the handler of the CreateOrderCommand becomes quite simple.

using System.Threading;
using System.Threading.Tasks;
using Dapr.OrderService.Domain.Models;
using MediatR;
using Dapr;
using System.Text.Json;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;
using System.Net.Http;
using System;
using System.Text;
using System.IO;

namespace Dapr.OrderService.Domain.Commands
{
    public class CreateOrderCommand : IRequest<Order>
    {
        public Order Order { get; set; }
    }

    public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, Order>
    {
        private readonly ILogger<CreateOrderCommandHandler> _logger;
        private readonly HttpClient _httpClient;
        public CreateOrderCommandHandler(ILogger<CreateOrderCommandHandler> logger
            , IHttpClientFactory clientFactory)
        {
            this._logger = logger;
            this._httpClient = clientFactory.CreateClient();
        }
        public async Task<Order> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
        {
            var validationResponses = new List<bool>(request.Order.OrderLines.Count());

            foreach (var orderLine in request.Order.OrderLines)
            {
                this._logger.LogInformation($"Validating line {orderLine.ProductCode}");

                var invokeResponse = await this._httpClient.PostAsync($"http://localhost:{Environment.GetEnvironmentVariable("DAPR_HTTP_PORT")}/v1.0/invoke/product-service/method/validate", new StringContent(JsonSerializer.Serialize(new ValidateProductRequest() { ProductCode = orderLine.ProductCode }), Encoding.UTF8, "application/json"));

                using (var responseStream = await invokeResponse.Content.ReadAsStreamAsync())
                {
                    var validationResponse = await JsonSerializer.DeserializeAsync<ValidateProductResponse>(responseStream);

                    this._logger.LogInformation($"Valid: {validationResponse.Result}");

                    validationResponses.Add(validationResponse.Result);
                }
            }


            if (validationResponses.Count(p => p == true) == request.Order.OrderLines.Count())
            {
                this._logger.LogInformation("Comitting order");

                var storedObject = new object[] { new { key = request.Order.OrderNumber, request.Order, } };

                await this._httpClient.PostAsync($"http://localhost:{Environment.GetEnvironmentVariable("DAPR_HTTP_PORT")}/v1.0/state", new StringContent(JsonSerializer.Serialize(storedObject)));

                await this._httpClient.PostAsync($"http://localhost:{Environment.GetEnvironmentVariable("DAPR_HTTP_PORT")}/v1.0/publish/neworder", new StringContent(JsonSerializer.Serialize(request.Order)));

                return request.Order;
            }
            else
            {
                this._logger.LogWarning("Order is invalid");

                return null;
            }
        }

        public class ValidateProductRequest
        {
            public string ProductCode { get; set; }
        }

        public class ValidateProductResponse
        {
            public bool Result { get; set; }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

When Dapr starts as a sidecar, an environment variable is assigned detailed both the DAPR_HTTP_PORT and the DAPR_GRPC_PORT.

This gives the ability to, using a simple HTTP client, make a consant call to the same endpoint regardless on what activity is happening (pub/sub, invoke, state storage).

In this instance, I make a call to the local invoke endpoint, passing in my product-service and the endpoint I wish to invoke. I also send some JSON in the body of the request containing the product code itself.

I expect that request, to return me a simple true/false value based on if the product exists or not.

If the total number of valid lines matches the total number of lines initially sent in, the order is persisted to the Dapr state store (by default this is Redis) using another handy object from the .NET client library called state store.

With that in mind, let's head off and implement the validation endpoint for the product service.

Product Service

To keep the codebase nice and lean, I'm going to map the endpoints directly in the Startup.cs file instead of creating separate controller files. The implementation of the product validation is also going to be pretty rudimentary.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.HttpsPolicy;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MediatR;

namespace Dapr.ProductService
{
    public class Startup
    {
        ...

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapPost("/validate", ValidateProduct);
            });

            async Task ValidateProduct(HttpContext context)
            {
                var productCode = await JsonSerializer.DeserializeAsync<string>(context.Request.Body);

                if (ValidProducts.Contains(productCode))
                {
                    await context.Response.WriteAsync("{ \"result\": true}");
                }
                else
                {
                    await context.Response.WriteAsync("{ \"result\": false}");
                }
            }
        }

        public static List<string> ValidProducts
        {
            get
            {
                return new List<string>(5){
            "WIDGET",
            "BOLT",
            "SCREW",
            "HAMMER",
            "TOOL"
        };
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Dapr is smart. Whatever the name of the method is that is invoked, it passes that directly on to an API endpoint with the same name.

In this instance, I have mapped an endpoint named to validate and invoke the ValidateProduct task to actually handle the request.

The product this company sells is pretty limited, so limited in fact they wanted to hard code them into the code itself. That makes the validation code really simple.

Once validated, I write a JSON response. Note the manually created JSON response. Ordinarily, I would create an object and return a serialized representation. For this simple example, a manually typed string seemed enough.

So we now have an endpoint that allows the creation and validation of an order, now what about the dispatch confirmation?

Pub/Sub

A publish/subscribe model using Dapr is just as easy as a request/response. Let's start with the publisher.

Publishing an event

After committing the order to a state store, it then needs to be published so that the dispatch service can act on it.

For that, I'm going to need the HTTP client again. The code to actually publish the event couldn't possibly be any simpler.

public async Task<Order> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
{
    var validationResponses = new List<bool>(request.Order.OrderLines.Count());

    foreach (var orderLine in request.Order.OrderLines)
    {
        this._logger.LogInformation($"Validating line {orderLine.ProductCode}");

        var invokeResponse = await this._httpClient.PostAsync($"http://localhost:{Environment.GetEnvironmentVariable("DAPR_HTTP_PORT")}/v1.0/invoke/product-service/method/validate", new StringContent(JsonSerializer.Serialize(new ValidateProductRequest() { ProductCode = orderLine.ProductCode }), Encoding.UTF8, "application/json"));

        using (var responseStream = await invokeResponse.Content.ReadAsStreamAsync())
        {
            var validationResponse = await JsonSerializer.DeserializeAsync<ValidateProductResponse>(responseStream);

            this._logger.LogInformation($"Valid: {validationResponse.Result}");

            validationResponses.Add(validationResponse.Result);
        }
    }


    if (validationResponses.Count(p => p == true) == request.Order.OrderLines.Count())
    {
        this._logger.LogInformation("Comitting order");

        var storedObject = new object[] { new { key = request.Order.OrderNumber, request.Order, } };

        await this._httpClient.PostAsync($"http://localhost:{Environment.GetEnvironmentVariable("DAPR_HTTP_PORT")}/v1.0/state", new StringContent(JsonSerializer.Serialize(storedObject)));

        await this._httpClient.PostAsync($"http://localhost:{Environment.GetEnvironmentVariable("DAPR_HTTP_PORT")}/v1.0/publish/neworder", new StringContent(JsonSerializer.Serialize(request.Order)));

        return request.Order;
    }
    else
    {
        this._logger.LogWarning("Order is invalid");

        return null;
    }
}

public class ValidateProductRequest
{
    public string ProductCode { get; set; }
}

public class ValidateProductResponse
{
    public bool Result { get; set; }
}
Enter fullscreen mode Exit fullscreen mode

We make a simple post request to the publish endpoint of the local Dapper sidecar. The last part of the URL is the name of the topic you'd like to publish the event too, in this instance 'neworder'.

Any data that needs to be passed along with the published event is sent as part of the post request body. In this instance, I'm just going to serialize the entire order object.

Subscriber

The subscriber is a tiny little bit more involved, but not by much. The way the Dapr subscription model works is really intuitive.

To set up a subscription, the API that is the subscriber needs to expose a GET endpoint on '/dapr/subscribe'. This endpoint needs to return a list of string names of the topic that will be subscribed to. It also then needs to expose a POST endpoint that is simply the name of the topic that the endpoint will handle.

In the instance of our dispatch service, it looks something like this.

using System.Collections.Generic;
using System.IO;
using System.Text.Json;
using Dapr.DispatchService.Domain.Models;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;

namespace Dapr.DispatchService
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddDaprClient();

            services.AddSingleton(new JsonSerializerOptions()
            {
                PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
                PropertyNameCaseInsensitive = true,
            });
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapGet("/dapr/subscribe", async context =>
                {
                    var subscribedTopics = new List<string>() { "neworder" };
                    await context.Response.WriteAsync(JsonConvert.SerializeObject(subscribedTopics));
                });

                endpoints.MapPost("/neworder", async context =>
                {
                    System.Console.WriteLine("Handling new order");

                    using (var streamReader = new StreamReader(context.Request.Body))
                    {
                        var json = await streamReader.ReadToEndAsync();

                        System.Console.WriteLine($"Order content {json}");

                        if (string.IsNullOrEmpty(json) == false){
                            var order = JsonConvert.DeserializeObject<DaprContentWrapper<Order>>(json);

                            File.WriteAllText($"C:\\Demonstration\\{order.Data.OrderNumber}.json", JsonConvert.SerializeObject(order, Formatting.Indented));
                        }
                    }
                });
            });
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Two endpoints are exposed, GET /dapr/susbcribe and POST /neworder. The new order endpoint is a really trivial piece of code to just write the order JSON to a file that is next to the running process.

Pulling it all together

To start up applications using Dapr, they need to be started using the dapr cli instead of using the bog standard 'dotnet run' commands.

Once you have the CLI installed, run the 3 below commands in the respective folders.

dapr run --app-id dispatch-service --app-port 5017 dotnet run
dapr run --app-id order-service --app-port 5020 dotnet run
dapr run --app-id product-service --app-port 5015  dotnet run
Enter fullscreen mode Exit fullscreen mode

Once all three Dapr instances are running, make a call to http://localhost:5020 with the below content and watch the magic happen.

{
    "orderNumber": "1234",
    "orderDate": "2019-12-16T00:00:00",
    "orderLines": [
        {
            "productCode": "WIDGET",
            "quantity": 1
        }
    ]
}
Enter fullscreen mode Exit fullscreen mode

For the purposes of this article, I've kept the example really trivial. There is almost no argument for making an app this simple three separate services. However, I hope it covers the magic of Dapr.

There are also client libraries in most languages that abstract away a lot of the manual parsing of response bodies etc. I've tried to use the basic HTTP request where possible to make it as language agnostic as possible.

If you want to check out the source code, or run this sample for yourself, check out my git repo at https://github.com/jeastham1993/playing-with-dapr.

💖 💪 🙅 🚩
jeastham1993
James Eastham

Posted on December 16, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Using Dapr for Microservice Communication
microservices Using Dapr for Microservice Communication

December 16, 2019