Go, Kafka and gRPC clean architecture CQRS microservices with Jaeger tracing 👋🧑‍💻

aleksk1ng

Alexander

Posted on August 30, 2021

Go, Kafka and gRPC clean architecture CQRS microservices with Jaeger tracing 👋🧑‍💻

In this article let's try to create closer to real world CQRS microservices with tracing and monitoring using: 🚀
Kafka as messages broker
gRPC Go implementation of gRPC
PostgreSQL as database
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
MongoDB Web and API based SMTP testing
Redis Type-safe Redis client for Golang
swag Swagger for Go
Echo web framework

Source code you can find in GitHub repository

Main idea here is implementation of CQRS using Go, Kafka and gRPC.
I don't try to write about what is CQRS pattern, because it's makes the article to huge and the best place to read is microservices.io.
Found very interesting and take as starting point example CQRS project and blog of Three Dots Labs.

In this example we have three services, api gateway, read and write services which communicates by kafka and gRPC,
for write database used Postgres, MongoDB for read and Redis for caching.
Like any real-world project of course we need metrics and tracing, here used Prometheus and Grafana for metrics, and Jaeger for tracing.
In this example did not implemented any interesting business logic and didn't cover tests, because don't have time.

UI interfaces will be available on ports:

Jaeger UI: http://localhost:16686

Prometheus UI: http://localhost:9090

Grafana UI: http://localhost:3000

Swagger UI: http://localhost:5001/swagger/index.html

Jaeger tracing ui:

Swagger ui:

Prometheus metrics ui:

Grafana metrics ui:

For local development:



make local or docker_dev // for run docker compose files
make migrate_up // run sql migrations
make mongo // run mongodb sripts
make swagger // generate swagger documentation


Enter fullscreen mode Exit fullscreen mode

For run all in the docker you can run make docker_dev it has hot reloading feature.

Docker compose file for this project:



version: "3.8"

services:
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

  microservices_postgesql:
    image: postgres:13-alpine
    container_name: microservices_postgesql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=products
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./microservices_pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  redis:
    image: redis:6-alpine
    restart: always
    container_name: microservices_redis
    ports:
      - "6379:6379"
    networks: [ "microservices" ]

  zoo1:
    image: zookeeper:3.4.9
    restart: always
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-single/zoo1/data:/data
      - ./zk-single-kafka-single/zoo1/datalog:/datalog
    networks: [ "microservices" ]

  kafka1:
    image: confluentinc/cp-kafka:5.5.1
    restart: always
    hostname: kafka1
    ports:
      - "9092:9092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1
    networks: [ "microservices" ]

  mongodb:
    image: mongo:latest
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
      MONGODB_DATABASE: products
    ports:
      - "27017:27017"
    volumes:
      - mongodb_data_container:/data/db
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.21
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

volumes:
  mongodb_data_container:

networks:
  microservices:
    name: microservices


Enter fullscreen mode Exit fullscreen mode

Api gateway service idea is to accept http requests, commands handlers publish events to kafka and queries handlers for retrieving data from reader service by gRPC.
In this project used Echo, but found Gin and Chi is very good for production too.

Let's look at code, create product http handler, accept and validate request body, then generate uuid and call create product command.
Many holly wars about passing id to command or service methods, as separate parameter or in body, or generate id in the command and return it,
can commands return values or not and etc., it's does not make sense, it's up to you and your team which way to use, better spent time and concentrate on more important business tasks :)



// CreateProduct
// @Tags Products
// @Summary Create product
// @Description Create new product item
// @Accept json
// @Produce json
// @Success 201 {object} dto.CreateProductResponseDto
// @Router /products [post]
func (h *productsHandlers) CreateProduct() echo.HandlerFunc {
    return func(c echo.Context) error {
        h.metrics.CreateProductHttpRequests.Inc()

        ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.CreateProduct")
        defer span.Finish()

        createDto := &dto.CreateProductDto{}
        if err := c.Bind(createDto); err != nil {
            h.log.WarnMsg("Bind", err)
            h.traceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        createDto.ProductID = uuid.NewV4()
        if err := h.v.StructCtx(ctx, createDto); err != nil {
            h.log.WarnMsg("validate", err)
            h.traceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        if err := h.ps.Commands.CreateProduct.Handle(ctx, commands.NewCreateProductCommand(createDto)); err != nil {
            h.log.WarnMsg("CreateProduct", err)
            h.metrics.ErrorHttpRequests.Inc()
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.metrics.SuccessHttpRequests.Inc()
        return c.JSON(http.StatusCreated, dto.CreateProductResponseDto{ProductID: createDto.ProductID})
    }
}


Enter fullscreen mode Exit fullscreen mode

Create product command handler is simple it's marshal command data and publish to kafka.
Kafka accepts []byte as value, used proto here, for pass tracing throughout kafka we have to use headers,
in tracing utils you can find helpers for it.
Go having some good libraries for working with kafka, I like segmentio_kafka-go.



func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
    defer span.Finish()

    createDto := &kafkaMessages.ProductCreate{
        ProductID:   command.CreateDto.ProductID.String(),
        Name:        command.CreateDto.Name,
        Description: command.CreateDto.Description,
        Price:       command.CreateDto.Price,
    }

    dtoBytes, err := proto.Marshal(createDto)
    if err != nil {
        return err
    }

    return c.kafkaProducer.PublishMessage(ctx, kafka.Message{
        Topic:   c.cfg.KafkaTopics.ProductCreate.TopicName,
        Value:   dtoBytes,
        Time:    time.Now().UTC(),
        Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
    })
}


Enter fullscreen mode Exit fullscreen mode

For working with kafka nice to have ui clients for debugging, personally like to use conductor

Writer service consumes kafka topics, process messages writing to postgres and publishes successfully processed messages to kafka.
For working with postgres in Go in my opinion the best choose is pgx, but if you need query builder very good library is squirrel,
personally don't like orm's, but usually as have seen, teams often uses gorm, it's up to you.
ProcessMessages method listening kafka topics and call specific method depends on topic:



func (s *productMessageProcessor) ProcessMessages(ctx context.Context, r *kafka.Reader, wg *sync.WaitGroup, workerID int) {
    defer wg.Done()

    for {
        select {
        case <-ctx.Done():
            return
        default:
        }

        m, err := r.FetchMessage(ctx)
        if err != nil {
            s.log.Warnf("workerID: %v, err: %v", workerID, err)
            continue
        }

        s.logProcessMessage(m, workerID)

        switch m.Topic {
        case s.cfg.KafkaTopics.ProductCreate.TopicName:
            s.processCreateProduct(ctx, r, m)
        case s.cfg.KafkaTopics.ProductUpdate.TopicName:
            s.processUpdateProduct(ctx, r, m)
        case s.cfg.KafkaTopics.ProductDelete.TopicName:
            s.processDeleteProduct(ctx, r, m)
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

Kafka message processing method deserialize and validate message body, pass it's to commands and commit,
in this place we have to chose way how we handle errors, but it depends on business logic, as example
we can use dead letter queue pattern.



func (s *productMessageProcessor) processCreateProduct(ctx context.Context, r *kafka.Reader, m kafka.Message) {
    s.metrics.CreateProductKafkaMessages.Inc()

    ctx, span := tracing.StartKafkaConsumerTracerSpan(ctx, m.Headers, "productMessageProcessor.processCreateProduct")
    defer span.Finish()

    var msg kafkaMessages.ProductCreate
    if err := proto.Unmarshal(m.Value, &msg); err != nil {
        s.log.WarnMsg("proto.Unmarshal", err)
        s.commitErrMessage(ctx, r, m)
        return
    }

    proUUID, err := uuid.FromString(msg.GetProductID())
    if err != nil {
        s.log.WarnMsg("proto.Unmarshal", err)
        s.commitErrMessage(ctx, r, m)
        return
    }

    command := commands.NewCreateProductCommand(proUUID, msg.GetName(), msg.GetDescription(), msg.GetPrice())
    if err := s.v.StructCtx(ctx, command); err != nil {
        s.log.WarnMsg("validate", err)
        s.commitErrMessage(ctx, r, m)
        return
    }

    if err := retry.Do(func() error {
        return s.ps.Commands.CreateProduct.Handle(ctx, command)
    }, append(retryOptions, retry.Context(ctx))...); err != nil {
        s.log.WarnMsg("CreateProduct.Handle", err)
        s.metrics.ErrorKafkaMessages.Inc()
        return
    }

    s.commitMessage(ctx, r, m)
}


Enter fullscreen mode Exit fullscreen mode

Writer's service create product command saves data to postgres and publish product saved event to kafka:



func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
    defer span.Finish()

    productDto := &models.Product{ProductID: command.ProductID, Name: command.Name, Description: command.Description, Price: command.Price}

    product, err := c.pgRepo.CreateProduct(ctx, productDto)
    if err != nil {
        return err
    }

    msg := &kafkaMessages.ProductCreated{Product: mappers.ProductToGrpcMessage(product)}
    msgBytes, err := proto.Marshal(msg)
    if err != nil {
        return err
    }

    message := kafka.Message{
        Topic:   c.cfg.KafkaTopics.ProductCreated.TopicName,
        Value:   msgBytes,
        Time:    time.Now().UTC(),
        Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
    }

    return c.kafkaProducer.PublishMessage(ctx, message)
}


Enter fullscreen mode Exit fullscreen mode

Postgres repository uses pgx, code is simple:



func (p *productRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "productRepository.CreateProduct")
    defer span.Finish()

    var created models.Product
    if err := p.db.QueryRow(ctx, createProductQuery, &product.ProductID, &product.Name, &product.Description, &product.Price).Scan(
        &created.ProductID,
        &created.Name,
        &created.Description,
        &created.Price,
        &created.CreatedAt,
        &created.UpdatedAt,
    ); err != nil {
        return nil, errors.Wrap(err, "db.QueryRow")
    }

    return &created, nil
}


Enter fullscreen mode Exit fullscreen mode

Reader service consumes kafka messages, save to MongoDB and caches by Redis, then project data for retrieving by gRPC calls.
Kafka listener handlers here are looks the same, product created command saves data to MongoDB and cache it in Redis:



func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
    defer span.Finish()

    product := &models.Product{
        ProductID:   command.ProductID,
        Name:        command.Name,
        Description: command.Description,
        Price:       command.Price,
        CreatedAt:   command.CreatedAt,
        UpdatedAt:   command.UpdatedAt,
    }

    created, err := c.mongoRepo.CreateProduct(ctx, product)
    if err != nil {
        return err
    }

    c.redisRepo.PutProduct(ctx, created.ProductID, created)
    return nil
}


Enter fullscreen mode Exit fullscreen mode

MongoDB repository save method:



func (p *mongoRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.CreateProduct")
    defer span.Finish()

    collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)

    _, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
    if err != nil {
        p.traceErr(span, err)
        return nil, errors.Wrap(err, "InsertOne")
    }

    return product, nil
}


Enter fullscreen mode Exit fullscreen mode

And redis caching method is:



func (r *redisRepository) PutProduct(ctx context.Context, key string, product *models.Product) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "redisRepository.PutProduct")
    defer span.Finish()

    productBytes, err := json.Marshal(product)
    if err != nil {
        r.log.WarnMsg("json.Marshal", err)
        return
    }

    if err := r.redisClient.HSetNX(ctx, r.getRedisProductPrefixKey(), key, productBytes).Err(); err != nil {
        r.log.WarnMsg("redisClient.HSetNX", err)
        return
    }
    r.log.Debugf("HSetNX prefix: %s, key: %s", r.getRedisProductPrefixKey(), key)
}


Enter fullscreen mode Exit fullscreen mode

And then api gateway can request reader service for data using gRPC.
Reader gRPC service method:



func (s *grpcService) GetProductById(ctx context.Context, req *readerService.GetProductByIdReq) (*readerService.GetProductByIdRes, error) {
    s.metrics.GetProductByIdGrpcRequests.Inc()

    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.GetProductById")
    defer span.Finish()

    productUUID, err := uuid.FromString(req.GetProductID())
    if err != nil {
        s.log.WarnMsg("uuid.FromString", err)
        return nil, s.errResponse(codes.InvalidArgument, err)
    }

    query := queries.NewGetProductByIdQuery(productUUID)
    if err := s.v.StructCtx(ctx, query); err != nil {
        s.log.WarnMsg("validate", err)
        return nil, s.errResponse(codes.InvalidArgument, err)
    }

    product, err := s.ps.Queries.GetProductById.Handle(ctx, query)
    if err != nil {
        s.log.WarnMsg("GetProductById.Handle", err)
        return nil, s.errResponse(codes.Internal, err)
    }

    s.metrics.SuccessGrpcRequests.Inc()
    return &readerService.GetProductByIdRes{Product: models.ProductToGrpcMessage(product)}, nil
}


Enter fullscreen mode Exit fullscreen mode

Get by id method tracing:

Api gateway get product by id http handler method:



// GetProductByID
// @Tags Products
// @Summary Get product
// @Description Get product by id
// @Accept json
// @Produce json
// @Param id path string true "Product ID"
// @Success 200 {object} dto.ProductResponse
// @Router /products/{id} [get]
func (h *productsHandlers) GetProductByID() echo.HandlerFunc {
    return func(c echo.Context) error {
        h.metrics.GetProductByIdHttpRequests.Inc()

        ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.GetProductByID")
        defer span.Finish()

        productUUID, err := uuid.FromString(c.Param(constants.ID))
        if err != nil {
            h.log.WarnMsg("uuid.FromString", err)
            h.traceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        query := queries.NewGetProductByIdQuery(productUUID)
        response, err := h.ps.Queries.GetProductById.Handle(ctx, query)
        if err != nil {
            h.log.WarnMsg("GetProductById", err)
            h.metrics.ErrorHttpRequests.Inc()
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.metrics.SuccessHttpRequests.Inc()
        return c.JSON(http.StatusOK, response)
    }
}


Enter fullscreen mode Exit fullscreen mode

and query handler code:



func (q *getProductByIdHandler) Handle(ctx context.Context, query *GetProductByIdQuery) (*dto.ProductResponse, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "getProductByIdHandler.Handle")
    defer span.Finish()

    ctx = tracing.InjectTextMapCarrierToGrpcMetaData(ctx, span.Context())
    res, err := q.rsClient.GetProductById(ctx, &readerService.GetProductByIdReq{ProductID: query.ProductID.String()})
    if err != nil {
        return nil, err
    }

    return dto.ProductResponseFromGrpc(res.GetProduct()), nil
}


Enter fullscreen mode Exit fullscreen mode

Search products method:



func (p *mongoRepository) Search(ctx context.Context, search string, pagination *utils.Pagination) (*models.ProductsList, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.Search")
    defer span.Finish()

    collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)

    filter := bson.D{
        {Key: "$or", Value: bson.A{
            bson.D{{Key: "name", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
            bson.D{{Key: "description", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
        }},
    }

    count, err := collection.CountDocuments(ctx, filter)
    if err != nil {
        p.traceErr(span, err)
        return nil, errors.Wrap(err, "CountDocuments")
    }
    if count == 0 {
        return &models.ProductsList{Products: make([]*models.Product, 0)}, nil
    }

    limit := int64(pagination.GetLimit())
    skip := int64(pagination.GetOffset())
    cursor, err := collection.Find(ctx, filter, &options.FindOptions{
        Limit: &limit,
        Skip:  &skip,
    })
    if err != nil {
        p.traceErr(span, err)
        return nil, errors.Wrap(err, "Find")
    }
    defer cursor.Close(ctx)

    products := make([]*models.Product, 0, pagination.GetSize())

    for cursor.Next(ctx) {
        var prod models.Product
        if err := cursor.Decode(&prod); err != nil {
            p.traceErr(span, err)
            return nil, errors.Wrap(err, "Find")
        }
        products = append(products, &prod)
    }

    if err := cursor.Err(); err != nil {
        span.SetTag("error", true)
        span.LogKV("error_code", err.Error())
        return nil, errors.Wrap(err, "cursor.Err")
    }

    return models.NewProductListWithPagination(products, count, pagination), nil
}


Enter fullscreen mode Exit fullscreen mode

More details and source code you can find here,
of course in real-world applications, we have to implement many more necessary features,
like circuit breaker, retries, rate limiters, etc., depends on project it can be implemented in different ways,
for example you can use kubernetes and istio for some of them.
I hope this article is usefully and helpfully, I'll be happy to receive any feedbacks or questions, feel free contact me by email or any messengers :)

💖 💪 🙅 🚩
aleksk1ng
Alexander

Posted on August 30, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related