Alexander
Posted on March 1, 2021
This article about tries to implement of clean architecture microservice using: 🚀
Kafka as messages broker
gRPC Go implementation of gRPC
MongoDB as database
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
Source code you can find in GitHub repository
I want the make the accent on Kafka, because i try it for first time, so it's learning by doing 👨💻
Monitoring and tracing of course is required for any microservice so it's included ⚡️
Kafka is high throughtput distributed messaging system.
At any time only one broker can be a leader for a given partition and only that leader can receive serve data for a partition,
other brokers syncronize the data.
Producers:
Producers write data to topics and automatically know to which broker and partition to write to.
In case of broker failure, producers automatically will recover.
Producers can choose to receive acknowledgement of data writes:
- acks = 0. Producer won't for acknowledgement, it's mean possible data loss.
- acks = 1. Producer will wait for leader acknowledgement, possible limited data loss in some cases too.
- acks = all. Leader + replicas acknowledgement, it requires acks from number of brokers specified in min.insync.replicas broker-side config key, and if less are currently in sync, the produce will fail. You may specify min.insync.replicas as low as 1 (acks==1 equivalent), or as high as your replication factor, or somewhere in between, so you can finely control the tradeoff b/w availability and consistency. Here is good article
Producers can choose to send a key with the message.
- If no key data is sent by round robin.
- If a key is sent, than all messages for that key will always go to the same partition.
Consumers:
Consumers read data from a topic and know which broker to read from.
Data are read in order within each partition.
Consumers Groups:
Consumers read data in consumer groups. Each consumer within a group reads from exclusive partitions.
So if you have more consumers than partitions, some consumers will be inactive.
Consumers offsets:
Kafka stores the offsets at which a consumer group has been reading.
When a consumer in a group has processed data, it should be committing the offsets.
If consumer down, it will be able to read back from where it left off.
Consumers choose when commit offsets.
There are 3 delivery cases:
At most once:
- offsets are committed as soon as message received
- if a process fails, the message will be lost and won't be read again.
At least once:
- offsets are committed after the message is processed
- if a process fails, the message will be read again.
- this can result in duplicate processing of messages, so here is important to be sure your processing is idempotent and won't impact your system.
Exactly once:
- can be achived for kakfa to kafka communication using kafka streams api.
- for external systems usage we need use idempotent consumer.
Compression
And one more very important feature is Compression
Compression is enabled at the Producer level and doesn't require any configuration change in the Brokers or Consumers.
By default it is none, for the starting point good choice is snappy or lz4.
For local development:
make local // runs docker-compose.local.yml
make crate_topics // create kafka topics
make mongo // load js init script to mongo docker container
make make_cert // generate local SLL certificates
make swagger // generate swagger documentation
UI interfaces will be available on ports:
Jaeger UI: http://localhost:16686
Prometheus UI: http://localhost:9090
Grafana UI: http://localhost:3000
Kafka UI: http://localhost:9000
Swagger UI by default will run on: https://localhost:5007/swagger/index.html
In Grafana you need to chose prometheus as metrics source and then create dashboard.
Good kafka docker setup with enclouded UI is confluent, but it's had huge images size and will download a half of world wide internet to your local pc.🤖 For this reason here as UI client i used kafdrop
Docker-compose.local.yml:
version: "3.8"
services:
zookeeper:
container_name: zookeeper
restart: always
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./data/zookeeper/data:/data
- ./data/zookeeper/datalog:/datalog
networks:
- products_network
kafka1:
container_name: kafka1
image: confluentinc/cp-kafka:5.3.0
restart: always
hostname: kafka1
ports:
- "9091:9091"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper
networks:
- products_network
kafka2:
container_name: kafka2
restart: always
image: confluentinc/cp-kafka:5.3.0
hostname: kafka2
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 2
volumes:
- ./data/kafka2/data:/var/lib/kafka/data
depends_on:
- zookeeper
networks:
- products_network
kafka3:
container_name: kafka3
image: confluentinc/cp-kafka:5.3.0
restart: always
hostname: kafka3
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka3/data:/var/lib/kafka/data
depends_on:
- zookeeper
networks:
- products_network
kafdrop:
container_name: kafdrop
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka1:19091"
depends_on:
- kafka1
- kafka2
- kafka3
networks:
- products_network
redis:
image: redis:6-alpine
container_name: user_redis
ports:
- "6379:6379"
restart: always
networks:
- products_network
prometheus:
container_name: prometheus_container
restart: always
image: prom/prometheus
volumes:
- ./monitoring/prometheus-local.yml:/etc/prometheus/prometheus.yml:Z
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--storage.tsdb.retention=20d'
- '--web.console.libraries=/usr/share/prometheus/console_libraries'
- '--web.console.templates=/usr/share/prometheus/consoles'
ports:
- '9090:9090'
networks:
- products_network
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks:
- products_network
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3000:3000'
networks:
- products_network
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.21
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- 5775:5775/udp
- 6831:6831/udp
- 6832:6832/udp
- 5778:5778
- 16686:16686
- 14268:14268
- 14250:14250
- 9411:9411
networks:
- products_network
mongodb:
image: mongo:latest
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin
MONGODB_DATABASE: products
ports:
- 27017:27017
volumes:
- mongodb_data_container:/data/db
volumes:
mongodb_data_container:
networks:
products_network:
driver: bridge
For Go client in production usually used segmentio and sarama,
both is good and up to you which one to chose, for this project i used segmentio.
I didn't implement any interesting business logic here and didn't cover tests, because of not enough time at this moment.
Our microservice can communicate by kafka, gRPC and REST.
In Makefile you can find all helpful commands.
For create kafka topics in docker:
docker exec -it kafka1 kafka-topics --zookeeper zookeeper:2181 --create --topic create-product --partitions 3 --replication-factor 2
For MongoDB we can load javascript files, this one creates collection and indexes:
mongo admin -u admin -p admin < init.js
Segmentio library api gives us reader and writer.
Create reader first:
func (pcg *ProductsConsumerGroup) getNewKafkaReader(kafkaURL []string, topic, groupID string) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: kafkaURL,
GroupID: groupID,
Topic: topic,
MinBytes: minBytes,
MaxBytes: maxBytes,
QueueCapacity: queueCapacity,
HeartbeatInterval: heartbeatInterval,
CommitInterval: commitInterval,
PartitionWatchInterval: partitionWatchInterval,
Logger: kafka.LoggerFunc(pcg.log.Debugf),
ErrorLogger: kafka.LoggerFunc(pcg.log.Errorf),
MaxAttempts: maxAttempts,
Dialer: &kafka.Dialer{
Timeout: dialTimeout,
},
})
}
and writer:
func (pcg *ProductsConsumerGroup) getNewKafkaWriter(topic string) *kafka.Writer {
w := &kafka.Writer{
Addr: kafka.TCP(pcg.Brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
RequiredAcks: writerRequiredAcks,
MaxAttempts: writerMaxAttempts,
Logger: kafka.LoggerFunc(pcg.log.Debugf),
ErrorLogger: kafka.LoggerFunc(pcg.log.Errorf),
Compression: compress.Snappy,
ReadTimeout: writerReadTimeout,
WriteTimeout: writerWriteTimeout,
}
return w
}
Then create consumers using Worker Pools
func (pcg *ProductsConsumerGroup) consumeCreateProduct(
ctx context.Context,
cancel context.CancelFunc,
groupID string,
topic string,
workersNum int,
) {
r := pcg.getNewKafkaReader(pcg.Brokers, topic, groupID)
defer cancel()
defer func() {
if err := r.Close(); err != nil {
pcg.log.Errorf("r.Close", err)
cancel()
}
}()
w := pcg.getNewKafkaWriter(deadLetterQueueTopic)
defer func() {
if err := w.Close(); err != nil {
pcg.log.Errorf("w.Close", err)
cancel()
}
}()
pcg.log.Infof("Starting consumer group: %v", r.Config().GroupID)
wg := &sync.WaitGroup{}
for i := 0; i <= workersNum; i++ {
wg.Add(1)
go pcg.createProductWorker(ctx, cancel, r, w, wg, i)
}
wg.Wait()
}
Workers validate message body then call usecase, if it's returns error, try for retry, good library for retry is retry-go,
if again fails, publish error message to very simple Dead Letter Queue as i said, didn't implement here any interesting business logic, so in real production we have to handle error cases in the better way.
And after message success processed commit it.
func (pcg *ProductsConsumerGroup) createProductWorker(
ctx context.Context,
cancel context.CancelFunc,
r *kafka.Reader,
w *kafka.Writer,
wg *sync.WaitGroup,
workerID int,
) {
span, ctx := opentracing.StartSpanFromContext(ctx, "ProductsConsumerGroup.createProductWorker")
defer span.Finish()
span.LogFields(log.String("ConsumerGroup", r.Config().GroupID))
defer wg.Done()
defer cancel()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
pcg.log.Errorf("FetchMessage", err)
return
}
pcg.log.Infof(
"WORKER: %v, message at topic/partition/offset %v/%v/%v: %s = %s\n",
workerID,
m.Topic,
m.Partition,
m.Offset,
string(m.Key),
string(m.Value),
)
incomingMessages.Inc()
var prod models.Product
if err := json.Unmarshal(m.Value, &prod); err != nil {
errorMessages.Inc()
pcg.log.Errorf("json.Unmarshal", err)
continue
}
if err := pcg.validate.StructCtx(ctx, prod); err != nil {
errorMessages.Inc()
pcg.log.Errorf("validate.StructCtx", err)
continue
}
if err := retry.Do(func() error {
created, err := pcg.productsUC.Create(ctx, &prod)
if err != nil {
return err
}
pcg.log.Infof("created product: %v", created)
return nil
},
retry.Attempts(retryAttempts),
retry.Delay(retryDelay),
retry.Context(ctx),
); err != nil {
errorMessages.Inc()
if err := pcg.publishErrorMessage(ctx, w, m, err); err != nil {
pcg.log.Errorf("publishErrorMessage", err)
continue
}
pcg.log.Errorf("productsUC.Create.publishErrorMessage", err)
continue
}
if err := r.CommitMessages(ctx, m); err != nil {
errorMessages.Inc()
pcg.log.Errorf("CommitMessages", err)
continue
}
successMessages.Inc()
}
}
In repository layer use mongo-go-driver for interreacting with database
// Create Create new product
func (p *productMongoRepo) Create(ctx context.Context, product *models.Product) (*models.Product, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "productMongoRepo.Create")
defer span.Finish()
collection := p.mongoDB.Database(productsDB).Collection(productsCollection)
product.CreatedAt = time.Now().UTC()
product.UpdatedAt = time.Now().UTC()
result, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
if err != nil {
return nil, errors.Wrap(err, "InsertOne")
}
objectID, ok := result.InsertedID.(primitive.ObjectID)
if !ok {
return nil, errors.Wrap(productErrors.ErrObjectIDTypeConversion, "result.InsertedID")
}
product.ProductID = objectID
return product, nil
}
Here is gRPC service implementation os create handler, and full code you can find in github repository:
// Create create new product
func (p *productService) Create(ctx context.Context, req *productsService.CreateReq) (*productsService.CreateRes, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "productService.Create")
defer span.Finish()
createMessages.Inc()
catID, err := primitive.ObjectIDFromHex(req.GetCategoryID())
if err != nil {
errorMessages.Inc()
p.log.Errorf("primitive.ObjectIDFromHex: %v", err)
return nil, grpcErrors.ErrorResponse(err, err.Error())
}
prod := &models.Product{
CategoryID: catID,
Name: req.GetName(),
Description: req.GetDescription(),
Price: req.GetPrice(),
ImageURL: &req.ImageURL,
Photos: req.GetPhotos(),
Quantity: req.GetQuantity(),
Rating: int(req.GetRating()),
}
created, err := p.productUC.Create(ctx, prod)
if err != nil {
errorMessages.Inc()
p.log.Errorf("productUC.Create: %v", err)
return nil, grpcErrors.ErrorResponse(err, err.Error())
}
successMessages.Inc()
return &productsService.CreateRes{Product: created.ToProto()}, nil
}
and REST API handler using echo:
// CreateProduct Create product
// @Tags Products
// @Summary Create new product
// @Description Create new single product
// @Accept json
// @Produce json
// @Success 201 {object} models.Product
// @Router /products [post]
func (p *productHandlers) CreateProduct() echo.HandlerFunc {
return func(c echo.Context) error {
span, ctx := opentracing.StartSpanFromContext(c.Request().Context(), "productHandlers.Create")
defer span.Finish()
createRequests.Inc()
var prod models.Product
if err := c.Bind(&prod); err != nil {
p.log.Errorf("c.Bind: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
if err := p.validate.StructCtx(ctx, &prod); err != nil {
p.log.Errorf("validate.StructCtx: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
if err := p.productUC.PublishCreate(ctx, &prod); err != nil {
p.log.Errorf("productUC.PublishCreate: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
successRequests.Inc()
return c.NoContent(http.StatusCreated)
}
}
On top layer of our app handling, logging errors and process metrics for Prometheus.
Repository with the source code and list of all used tools u can find here 👨💻 :)
I hope this article is usefully and helpfully, I'll be happy to receive any feedbacks or questions :)
Posted on March 1, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.