Learning Microservices with Go(Part 4). GRPC (Synchronous Communication)
Manav Kushwaha
Posted on February 14, 2024
Intro
This post is the fourth part of the "Learning Microservices with Go". I'm writing these posts as I'm learning the concepts. If you haven't checked out the part 3, here's the link for it: Part 3.
Link for Git Repo: Github
In part 3, we explored serialization, various serialization formats, their comparison, implemented some simple benchmarks to measure the performance.
In this part, we'll be adding gRPC code to our project which uses Protocol Buffers, that we'd learnt about in the last part. We'll use gRPC logic for communication with different services instead of REST APIs which we're using earlier.
We'll also be testing the services using Postman. This was the first time I used postman to test a non-http service. Everyday you learn something new!!
Let's start!
Synchronous Communication
Synchronous communication is way for applications(eg: microservices) to interact over the network using the request-response model. There is a request that goes to the other application and it waits for the response from the other side. It waits till the response comes or the request times out. This way of interaction is used by many protocols eg: HTTP.
Also, there are some existing Remote Procedure Call(RPC) framework and libraries that can help you achieve this task. What's an RPC? RPC allows one machine on the network to call a function(subroutine) on another machine as if it was called natively on the other machine.
For our use case, we'll be using RPCs to call methods of a service from another service. We'll be taking help of gRPC for this task.
gRPC
gRPC is an RPC framework that was developed by Google. It used HTTP/2 as the transport protocol and Protobufs as the serialization format.
- It essentially allows the user to define client side and server side code for the services. The code generated can be used by other services to call the specified method on this service.
- Also it's language agnostic i.e. you can use different language for each service that best suits that service.
gRPC is one of the most widely known RPC framework used for building microservice architecture.
Defining Service using Protobufs
Let's now see how to define a service using Protobufs. We'll also be defining code for the gRPC framework, which will allow us to talk with other services.
First install the gRPC plugin for Protobuf. You can do this by:
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
Now we'll add some code to the api/movie.proto
file that we created in the last part.
service MetadataService {
rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse);
rpc PutMetadata(PutMetadataRequest) returns (PutMetadataResponse);
}
message GetMetadataRequest {
string movie_id = 1;
}
message GetMetadataResponse {
Metadata metadata = 1;
}
message PutMetadataRequest {
Metadata metadata = 1;
}
message PutMetadataResponse {
}
The code that we added defined our MetadataService. We already have the Metadata structure which we defined in the previous part that we can reuse.
Let's explain the code that we added.
We defined a rpc named GetMetadata
that accepts GetMetadataRequest
as it's parameter and outputs GetMetadataResponse
. We then defined these two messages below the request function. Note:
- It's good practice to create a new structure for both a request and a response. Eg: GetMetadataRequest, GetMetadataResponse.
- It's good to follow consistent naming for all the endpoints. We'll prefix the request and response with the function name for all our endpoints.
Similarly for the PutMetadata
.
Now defining the RatingService.
service RatingService {
rpc GetAggregatedRating(GetAggregatedRatingRequest) returns (GetAggregatedRatingResponse);
rpc PutRating(PutRatingRequest) returns (PutRatingResponse);
}
message GetAggregatedRatingRequest {
string record_id = 1;
string record_type = 2;
}
message GetAggregatedRatingResponse {
double rating_value = 1;
}
message PutRatingRequest {
string user_id = 1;
string record_id = 2;
int32 record_type = 3;
int32 record_value = 4;
}
message PutRatingResponse {
}
Similarly add the code for the MovieService
service MovieService {
rpc GetMovieDetails(GetMovieDetailsRequest) returns (GetMovieDetailsResponse);
}
message GetMovieDetailsRequest {
string movie_id = 1;
}
message GetMovieDetailsResponse {
MovieDetails movie_details = 1;
}
Now our movie.proto
file includes the structure definitions and the API definitions for the
services.
We'll run the following command in the root directory to generate the code for these definitions.
protoc -I=api --go_out=. --go-grpc_out=. movie.proto
-I=api
defines the input file path
--go_out=.
defines the output path of the protobuf structure code
--go-grpc_out
defines the output path of the service code.
movie.proto
specifies the protobuf file containing definitions.
After execution, we'll find a movie_grpc.pb.go
file in the /gen
directory containing the code for the services.
Here's a small screenshot of what it'll look like.
We'll use this generated code in our Go services.
Generated Model vs Internal model
So currently we're having two versions of our model data structures. 1. The internal ones in metadata/pkg/model
2. The newly generated ones in gen
directory.
Some of you might think that having two similar structures is redundant. (I also thought the same earlier). But, we need to understand that these structures differ in the their purpose.
- Internal Model: The structures that we create to be used across our code base like in repository, controller etc.
- Generated Model: The structures generated by tools like protoc compiler. These should only be used for serialization ie for transferring the data or storing the serialized data.
We should not use the generated structures across the application for the following reasons:
- Coupling between application and serialization format. Switching to other serialization would requre full codebase changes.
- Generated code (function definitions etc) may vary from version to version. This might cause your code which is using the generated code to break.
-
Generated Code is harder to use. Eg: In case of protobufs the field values are optional. Hence it'll require having multiple
field != nil
checks across the project.
Hence, we'll need some mapping logic to convert the data from one model to other. We'll add a file called mapper.go
in the directory metadata/pkg/model
package model
import "movieexample.com/gen"
// MetadataToProto converts a Metadata struct into a generated proto counterpart
func MetadataToProto(m *Metadata) *gen.Metadata {
return &gen.Metadata{
Id: m.ID,
Title: m.Title,
Description: m.Description,
Director: m.Director,
}
}
// MetadataFromProto converts a generated proto counterpart into a Metadata struct
func MetadataFromProto(m *gen.Metadata) *Metadata {
return &Metadata{
ID: m.Id,
Title: m.Title,
Description: m.Description,
Director: m.Director,
}
}
Implementing clients and gateways
We'll now plugin the generated code into our microservices. This will help us to switch our communication protocol from the JSON based HTTP to Protobufs based gRPC calls.
In metadata/internal/handler
add grpc directory and add grpc.go
file.
package grpc
import (
"context"
"errors"
"log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"movieexample.com/gen"
"movieexample.com/metadata/internal/controller/metadata"
"movieexample.com/metadata/pkg/model"
)
// Handler defines a movie metadata gRPC handler
type Handler struct {
gen.UnimplementedMetadataServiceServer
svc *metadata.Controller
}
// New creates a new movie metadata gRPC handler
func New(ctrl *metadata.Controller) *Handler {
return &Handler{svc: ctrl}
}
// GetMetadata returns movie metadata by id
func (h *Handler) GetMetadata(ctx context.Context, req *gen.GetMetadataRequest) (*gen.GetMetadataResponse, error) {
if req == nil || req.MovieId == "" {
return nil, status.Errorf(codes.InvalidArgument, "nil req or empty id")
}
m, err := h.svc.Get(ctx, req.MovieId)
if err != nil && errors.Is(err, metadata.ErrNotFound) {
log.Printf("GetMetadata failed: Err: %v", err)
return nil, status.Errorf(codes.NotFound, err.Error())
} else if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
return &gen.GetMetadataResponse{Metadata: model.MetadataToProto(m)}, nil
}
Let's see the key points regarding the implementation:
- Our Handler implemented the function in the same format as the one defined in the generated MetadataServiceServer interface.
- We use the MetadataToProto mapping function to transform our internal structures into generated ones.
- We embed the
UnimplementedMetadataServiceServer
into our Handler. This is required by the Protobufs compiler.
Now we'll update the metadata/cmd/main.go
file. Most of the code is from the previous parts
import (
grpcHandler "movieexample.com/metadata/internal/handler/grpc"
)
func main() {
// The service will be running on port "port"
var port int
flag.IntVar(&port, "port", 8081, "API Handler port")
flag.Parse()
log.Printf("Starting the movie metadata service on port %d", port)
registery, err := consul.NewRegistery("localhost:8500")
if err != nil {
panic(err)
}
ctx := context.Background()
instanceID := discovery.GenerateInstanceID(serviceName)
if err := registery.Register(ctx, instanceID, serviceName, fmt.Sprintf("localhost:%d", port)); err != nil {
panic(err)
}
go func() {
for {
if err := registery.HealthCheck(instanceID, serviceName); err != nil {
log.Println("Failed to report healthy state: ", err.Error())
}
time.Sleep(1 * time.Second)
}
}()
defer registery.Deregister(ctx, instanceID, serviceName)
repo := memory.New()
// Updated code
svc := metadata.New(repo)
h := grpcHandler.New(svc)
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%v", port))
if err != nil {
log.Fatal("failed to listen: %v", err)
}
srv := grpc.NewServer()
gen.RegisterMetadataServiceServer(srv, h)
if err := srv.Serve(lis); err != nil {
panic(err)
}
}
We instantiated our gRPC server and started listening for requests on it. Rest of the code is same as before.
Let's do the same with the rating service.
Create a file grpc.go
in /rating/internal/handler/grpc
package grpc
import (
"context"
"errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"movieexample.com/gen"
"movieexample.com/rating/internal/controller/rating"
"movieexample.com/rating/pkg/model"
)
// Handler defines a gRPC rating API handler
type Handler struct {
gen.UnimplementedRatingServiceServer
svc *rating.Controller
}
// New creates a new gRPC rating API handler
func New(ctrl *rating.Controller) *Handler {
return &Handler{svc: ctrl}
}
// GetAggregateRating returns the aggregated rating for a record.
func (h *Handler) GetAggregateRating(ctx context.Context, req *gen.GetAggregatedRatingRequest) (*gen.GetAggregatedRatingResponse, error) {
if req.RecordId == "" || req.RecordType == "" {
return nil, status.Errorf(codes.InvalidArgument, "empty record id or record type")
}
val, err := h.svc.GetAggregatedRatings(ctx, model.RecordID(req.RecordId), model.RecordType(req.RecordType))
if err != nil && errors.Is(err, rating.ErrNotFound) {
return nil, status.Errorf(codes.NotFound, err.Error())
} else if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
return &gen.GetAggregatedRatingResponse{RatingValue: val}, nil
}
// PutRating writes a rating for a given record.
func (h *Handler) PutRating(ctx context.Context, req *gen.PutRatingRequest) (*gen.PutRatingResponse, error) {
if req == nil || req.RecordId == "" || req.UserId == "" {
return nil, status.Errorf(codes.InvalidArgument, "nil req or empty user id or record id")
}
rating := &model.Rating{
RecordID: model.RecordID(req.RecordId),
RecordType: model.RecordType(req.RecordType),
UserID: model.UserID(req.UserId),
Value: model.RatingValue(req.RecordValue),
}
if err := h.svc.PutRating(ctx, model.RecordID(req.RecordId), model.RecordType(req.RecordType), rating); err != nil {
return nil, err
}
return &gen.PutRatingResponse{}, nil
}
Now similar to the metadata service, we'll update the main.go for rating i.e. rating/cmd/main.go
import (
grpcHandler "movieexample.com/rating/internal/handler/grpc"
)
func main() {
var port int
flag.IntVar(&port, "port", 8082, "API Handler port")
flag.Parse()
fmt.Printf("Starting the movie rating service on port %d", port)
registery, err := consul.NewRegistery("localhost:8500")
if err != nil {
panic(err)
}
ctx := context.Background()
instanceID := discovery.GenerateInstanceID(serviceName)
if err := registery.Register(ctx, instanceID, serviceName, fmt.Sprintf("localhost:%d", port)); err != nil {
panic(err)
}
go func() {
for {
if err := registery.HealthCheck(instanceID, serviceName); err != nil {
log.Println("Failed to report healthy state: ", err.Error())
}
time.Sleep(1 * time.Second)
}
}()
defer registery.Deregister(ctx, instanceID, serviceName)
repo := memory.New()
ctrl := rating.New(repo)
// New Code
h := grpcHandler.New(ctrl)
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%v", port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
srv := grpc.NewServer()
gen.RegisterRatingServiceServer(srv, h)
srv.Serve(lis)
}
Note: There are other imports that I've not put in the code as it'll make this part even longer.
Now add a handler file for movie service. Add movie/internal/handler/grpc/grpc.go
package grpc
import (
"context"
"log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"movieexample.com/gen"
"movieexample.com/metadata/pkg/model"
"movieexample.com/movie/internal/controller/movie"
)
type Handler struct {
gen.UnimplementedMovieServiceServer
ctrl *movie.Controller
}
func New(ctrl *movie.Controller) *Handler {
return &Handler{ctrl: ctrl}
}
func (h *Handler) GetMovieDetails(ctx context.Context, req *gen.GetMovieDetailsRequest) (*gen.GetMovieDetailsResponse, error) {
log.Printf("GetMovieDetails called: %v", req.MovieId)
if req == nil || req.MovieId == "" {
return nil, status.Errorf(codes.InvalidArgument, "nil req or empty id")
}
m, err := h.ctrl.Get(ctx, req.MovieId)
// Handle the errors returned by the grpc response
if e, ok := status.FromError(err); ok {
switch e.Code() {
case codes.NotFound:
return nil, status.Errorf(codes.NotFound, err.Error())
default:
return nil, status.Errorf(codes.Internal, err.Error())
}
}
return &gen.GetMovieDetailsResponse{
MovieDetails: &gen.MovieDetails{
Rating: *m.Rating,
Metadata: model.MetadataToProto(&m.Metadata),
},
}, nil
}
In the previous few steps, we've added logic on the server side to handle the client requests. We'll now add code that'll connect to that logic from the client side.
First, we'll add a file internal/grpcutil/grpcutil.go
. This will contain the logic for connecting to any service. We're extracting that logic to a function in this file.
package grpcutil
import (
"context"
"math/rand"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"movieexample.com/pkg/discovery"
)
// ServiceConnection attemps to select a random service instance and returns a gRPC connection to it.
func ServiceConnection(ctx context.Context, serviceName string, registery discovery.Registery) (*grpc.ClientConn, error) {
addrs, err := registery.Discover(ctx, serviceName)
if err != nil {
return nil, err
}
return grpc.Dial(addrs[rand.Intn(len(addrs))], grpc.WithTransportCredentials(insecure.NewCredentials()))
}
This function will pick a random instance of the service and connect to it.
Now we'll create gateway for our services.
Create a file as movie/internal/gateway/metdata/grpc/metadata.go
Add the following code
package grpc
import (
"context"
"movieexample.com/gen"
"movieexample.com/internal/grpcutil"
"movieexample.com/metadata/pkg/model"
"movieexample.com/pkg/discovery"
)
// Gateway defines a gRPC gateway for movie metadata service
type Gateway struct {
registry discovery.Registery
}
// New creates a new gRPC gateway for movie metadata service
func New(registry discovery.Registery) *Gateway {
return &Gateway{registry: registry}
}
// Get retrieves movie metadata by movie id
func (g *Gateway) Get(ctx context.Context, id string) (*model.Metadata, error) {
// Create a gRPC connection to the movie metadata service
conn, err := grpcutil.ServiceConnection(ctx, "metadata", g.registry)
if err != nil {
return nil, err
}
defer conn.Close()
client := gen.NewMetadataServiceClient(conn)
resp, err := client.GetMetadata(ctx, &gen.GetMetadataRequest{MovieId: id})
if err != nil {
return nil, err
}
return model.MetadataFromProto(resp.Metadata), nil
}
Let's mention some key points:
- We'll be using the grpcutil.ServiceConnection to connect to any service. This is the method that we'd created recently.
- We converted the metadata from Protobufs to model type using
MetadataFromProto
function.
Now we'll create a gateway for the rating service. Create a file /movie/internal/gateway/rating/grpc/rating.go
package grpc
import (
"context"
"movieexample.com/gen"
"movieexample.com/internal/grpcutil"
"movieexample.com/pkg/discovery"
"movieexample.com/rating/pkg/model"
)
// Gateway defines a gRPC gateway for rating service
type Gateway struct {
registry discovery.Registery
}
// New creates a new gRPC gateway for rating service
func New(registry discovery.Registery) *Gateway {
return &Gateway{registry}
}
// GetAggregatedRating returns the aggregated rating for a record or ErrNotFound if there are no ratings for it.
func (g *Gateway) GetAggregatedRating(ctx context.Context, recordID model.RecordID, recordType model.RecordType) (float64, error) {
conn, err := grpcutil.ServiceConnection(ctx, "rating", g.registry)
if err != nil {
return 0, err
}
defer conn.Close()
client := gen.NewRatingServiceClient(conn)
resp, err := client.GetAggregatedRating(ctx, &gen.GetAggregatedRatingRequest{RecordId: string(recordID), RecordType: string(recordType)})
if err != nil {
return 0, err
}
return resp.RatingValue, nil
}
// PutRating adds a rating for a record
func (g *Gateway) PutRating(ctx context.Context, recordID model.RecordID, recordType model.RecordType, userID model.UserID, value model.RatingValue) error {
conn, err := grpcutil.ServiceConnection(ctx, "rating", g.registry)
if err != nil {
return err
}
defer conn.Close()
client := gen.NewRatingServiceClient(conn)
putRatingRequest := &gen.PutRatingRequest{
UserId: string(userID),
RecordId: string(recordID),
RecordType: string(recordType),
RecordValue: int32(value),
}
_, err = client.PutRating(ctx, putRatingRequest)
if err != nil {
return err
}
return nil
}
Now let's finally change the main function of the movie service.
func main() {
var port int
flag.IntVar(&port, "port", 8083, "Port to listen on")
flag.Parse()
// Register with consul
registery, err := consul.NewRegistery("localhost:8500")
if err != nil {
panic(err)
}
ctx := context.Background()
instanceID := discovery.GenerateInstanceID(serviceName)
if err := registery.Register(ctx, instanceID, serviceName, fmt.Sprintf("localhost:%d", port)); err != nil {
panic(err)
}
go func() {
for {
if err := registery.HealthCheck(instanceID, serviceName); err != nil {
log.Println("Failed to report healthy state: ", err.Error())
}
time.Sleep(1 * time.Second)
}
}()
defer registery.Deregister(ctx, instanceID, serviceName)
log.Printf("Starting the movie service at port: %d\n", port)
metadataGateway := metadataGateway.New(registery)
ratingGateway := ratingGateway.New(registery)
ctrl := movie.New(ratingGateway, metadataGateway)
// New Code
h := grpcHanlder.New(ctrl)
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%v", port))
if err != nil {
log.Fatal("failed to listen: err")
}
srv := grpc.NewServer()
reflection.Register(srv)
gen.RegisterMovieServiceServer(srv, h)
if err := srv.Serve(lis); err != nil {
log.Fatalf("failed to listen: %v", err)
}
}
Note: We also registered our server with the reflection package. This allows us to use API request tools(eg: Postman) and get information about the available methods.
We've now completed the coding part. We'll now be testing our implementation.
Testing
Run the main.go in each directory. Also remember to run the consul docker container before that. I added the command to a file docker-compose
in the root directory.
services:
dev-consul:
image: hashicorp/consul
command: agent -server -ui -node=server-1 -bootstrap-expect=1 -client=0.0.0.0
ports:
- 8500:8500
- 8600:8600/udp
Also added a Makefile
consul:
@docker compose up -d
You can now just run make consul
and your consul service will be up.
Now after you've started all the services. Open the Postman app. Select a new request and select grpc
request type.
Final Request can be:
On sending the request you'll be seeing the following:
We can see that the response is correct. We are sending the Not Found code as the response as we'd defined in our code.
Tadaaaaa. We've completed this part!!! This was a challenging part for me personally. Probably due to many things that I saw for the first time.
I also felt that JSON should be used if you're not thinking about scaling and implementing some service which is not going to have very high traffic. JSON is trivial and is easy to implement and debug.
But Protobufs give a high advantage over JSON when optimizing is important.
Here's the github repo: Github
Posted on February 14, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.