Go and ElasticSearch full-text search microservice in k8s👋✨💫
Alexander
Posted on August 16, 2022
👨💻 Full list what has been used:
Elasticsearch client for Go
RabbitMQ Go RabbitMQ Client Library
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
Echo web framework
Kibana is user interface that lets you visualize your Elasticsearch
Docker and docker-compose
Kubernetes K8s
Helm The package manager for Kubernetes
Source code you can find in GitHub repository.
The main idea of this project is the implementation of a full-text search with support for synonyms, mistyping, and the wrong keyboard layout
using Elasticsearch and RabbitMQ.
All UI interfaces will be available on ports:
Grafana UI: http://localhost:3005
Kibana UI: http://localhost:5601/app/home#/
RabbitMQ UI: http://localhost:15672
Jaeger UI: http://localhost:16686
Prometheus UI: http://localhost:9090
Docker-compose file for this project:
version: "3.9"
services:
rabbitmq:
image: rabbitmq:3.9-management-alpine
container_name: rabbitmq
restart: always
ports:
- "5672:5672"
- "15672:15672"
networks: [ "microservices" ]
node01:
image: docker.elastic.co/elasticsearch/elasticsearch:8.3.3
container_name: node01
restart: always
environment:
- node.name=node01
- cluster.name=es-cluster-8
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.license.self_generated.type=basic
- xpack.security.enabled=false
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./es-data01:/usr/share/elasticsearch/data
ports:
- "9200:9200"
- "9300:9300"
networks: [ "microservices" ]
kibana:
image: docker.elastic.co/kibana/kibana:8.3.3
restart: always
environment:
ELASTICSEARCH_HOSTS: http://node01:9200
ports:
- "5601:5601"
depends_on:
- node01
networks: [ "microservices" ]
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.35
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"
- "14250:14250"
- "9411:9411"
networks: [ "microservices" ]
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3005:3000'
networks: [ "microservices" ]
volumes:
es-data01:
driver: local
networks:
microservices:
name: microservices
Full-text search with auto-completion can be realized in different ways, it's up to you choose which is better for your case.
First, we have to create mappings for our index, of course, elasticsearch can create it for us, but it's not a production solution.
As data model used here simple abstract shopping product item with searchable title, description, and shop name fields, for this example it's enough.
The creation of the right mapping is very important and tricky,
to realize full-text search with synonyms, mistyping, and wrong keyboard layout we will configure our own analyzer,
which combines the character filters, tokenizer, and token filters.
Our ngram_filter is elastic builtin edge_ngram filter,
for understand how it works highly recommend read elasticsearch documentation,
We have to assign min and max values, it always depends on each unique case.
Next, let's specify name_synonym_filter, in synonyms field we have to add an array of strings, each string is a comma separate synonyms which will be used for search.
In mapping properties let's create our document fields mappings, for full-text search field is "type": "text",
as an analyzer our newly created autocomplete_analyzer, it's used when elastic index documents, and for search queries, we don't need so complex analyzer it will only slow down our search queries,
so we add another one search_analyzer: standard
Handling of wrong keyboard language layout we can implement in different ways, on elasticsearch side by synonyms, but it makes mappings to huge
and on the application level by creating a mapping for keyboard language then map users search term to apposite language and send to elastic query for search any one of it,
so for example search the words "Apple, apple, яблоко, Яблоко, z,kjrj, Z,kjrj, Фззду, фззду" will find the same for the all of it.
In real-world scenarios usually mapping is much more complicated but here for example it's enough:
For Go available two good libraries for elasticsearch, the official Elasticsearch client and another one from community olivere elastic,
both is good, but at this moment only the official client supports 8 version of elasticsearch and for serious production think it's the choice.
Our microservice interacts by HTTP using Echo web framework and RabbitMQ official client,
REST controller has index document and search methods:
func (h *productController) indexAsync() echo.HandlerFunc {
return func(c echo.Context) error {
ctx, span := tracing.StartHttpServerTracerSpan(c, "productController.indexAsync")
defer span.Finish()
var product domain.Product
if err := c.Bind(&product); err != nil {
h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
product.ID = uuid.NewV4().String()
if err := h.productUseCase.IndexAsync(ctx, product); err != nil {
h.log.Errorf("(productUseCase.IndexAsync) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.log.Infof("created product: %+v", product)
h.metrics.HttpSuccessIndexAsyncRequests.Inc()
return c.JSON(http.StatusCreated, product)
}
}
func (h *productController) index() echo.HandlerFunc {
return func(c echo.Context) error {
ctx, span := tracing.StartHttpServerTracerSpan(c, "productController.index")
defer span.Finish()
var product domain.Product
if err := c.Bind(&product); err != nil {
h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
product.ID = uuid.NewV4().String()
if err := h.productUseCase.Index(ctx, product); err != nil {
h.log.Errorf("(productUseCase.Index) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.log.Infof("created product: %+v", product)
h.metrics.HttpSuccessIndexRequests.Inc()
return c.JSON(http.StatusCreated, product)
}
}
func (h *productController) search() echo.HandlerFunc {
return func(c echo.Context) error {
ctx, span := tracing.StartHttpServerTracerSpan(c, "productController.search")
defer span.Finish()
searchTerm := c.QueryParam("search")
pagination := utils.NewPaginationFromQueryParams(c.QueryParam(constants.Size), c.QueryParam(constants.Page))
searchResult, err := h.productUseCase.Search(ctx, searchTerm, pagination)
if err != nil {
h.log.Errorf("(productUseCase.Search) err: %v", tracing.TraceWithErr(span, err))
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.log.Infof("search result: %s", searchResult.PaginationResponse.String())
h.metrics.HttpSuccessSearchRequests.Inc()
span.LogFields(log.String("search result", searchResult.PaginationResponse.String()))
return c.JSON(http.StatusOK, dto.SearchProductsResponse{
SearchTerm: searchTerm,
Pagination: searchResult.PaginationResponse,
Products: searchResult.List,
})
}
}
The useCase for IndexAsync method serialize data and publish it to RabbitMQ:
func (p *productUseCase) IndexAsync(ctx context.Context, product domain.Product) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "productUseCase.IndexAsync")
defer span.Finish()
span.LogFields(log.Object("product", product))
dataBytes, err := serializer.Marshal(&product)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrap(err, "serializer.Marshal"))
}
return p.amqpPublisher.Publish(
ctx,
p.cfg.ExchangeAndQueueBindings.IndexProductBinding.ExchangeName,
p.cfg.ExchangeAndQueueBindings.IndexProductBinding.BindingKey,
amqp.Publishing{
Headers: tracing.ExtractTextMapCarrierHeadersToAmqpTable(span.Context()),
Timestamp: time.Now().UTC(),
Body: dataBytes,
},
)
}
The RabbitMQ consumer is listening queue and processing messages using BulkIndexer for Bulk API
which has better performance for indexing documents.
func (c *consumer) ConsumeIndexDeliveries(ctx context.Context, deliveries <-chan amqp.Delivery, workerID int) func() error {
return func() error {
c.log.Infof("starting consumer workerID: %d, for queue deliveries: %s", workerID, c.cfg.ExchangeAndQueueBindings.IndexProductBinding.QueueName)
for {
select {
case <-ctx.Done():
c.log.Errorf("products consumer ctx done: %v", ctx.Err())
return ctx.Err()
case msg, ok := <-deliveries:
if !ok {
c.log.Errorf("deliveries channel closed for queue: %s", c.cfg.ExchangeAndQueueBindings.IndexProductBinding.QueueName)
return errors.New("deliveries channel closed")
}
c.log.Infof("Consumer delivery: workerID: %d, msg data: %s, headers: %+v", workerID, string(msg.Body), msg.Headers)
if err := c.bulkIndexProduct(ctx, msg); err != nil {
c.log.Errorf("bulkIndexProduct err: %v", err)
continue
}
c.log.Infof("Consumer <<<ACK>>> delivery: workerID: %d, msg data: %s, headers: %+v", workerID, string(msg.Body), msg.Headers)
c.metrics.RabbitMQSuccessBatchInsertMessages.Inc()
}
}
}
}
func (c *consumer) indexProduct(ctx context.Context, msg amqp.Delivery) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.indexProduct")
defer span.Finish()
var product domain.Product
if err := serializer.Unmarshal(msg.Body, &product); err != nil {
c.log.Errorf("indexProduct serializer.Unmarshal <<<Reject>>> err: %v", tracing.TraceWithErr(span, err))
return msg.Reject(true)
}
if err := c.productUseCase.Index(ctx, product); err != nil {
c.log.Errorf("indexProduct productUseCase.Index <<<Reject>>> err: %v", tracing.TraceWithErr(span, err))
return msg.Reject(true)
}
return msg.Ack(true)
}
func (c *consumer) bulkIndexProduct(ctx context.Context, msg amqp.Delivery) error {
ctx, span := tracing.StartRabbitConsumerTracerSpan(ctx, msg.Headers, "consumer.bulkIndexProduct")
defer span.Finish()
var product domain.Product
if err := serializer.Unmarshal(msg.Body, &product); err != nil {
c.log.Errorf("indexProduct serializer.Unmarshal <<<Reject>>> err: %v", tracing.TraceWithErr(span, err))
return msg.Reject(true)
}
if err := c.bulkIndexer.Add(ctx, esutil.BulkIndexerItem{
Index: c.cfg.ElasticIndexes.ProductsIndex.Name,
Action: "index",
DocumentID: product.ID,
Body: bytes.NewReader(msg.Body),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem) {
c.log.Debugf("bulk indexer onSuccess for index alias: %s", c.cfg.ElasticIndexes.ProductsIndex.Alias)
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, err error) {
if err != nil {
c.log.Errorf("bulk indexer OnFailure err: %v", err)
}
},
}); err != nil {
c.log.Errorf("indexProduct bulkIndexer.Add <<<Reject>>> err: %v", tracing.TraceWithErr(span, err))
return msg.Reject(true)
}
c.log.Infof("consumer <<<ACK>>> bulk indexer add product: %+v", product)
return msg.Ack(true)
}
The repository has the same methods for index and search.
For the search method, we use should multi_match query where we pass the original term and mapped to the opposite keyboard language layout search term.
Good practice for Elasticsearch is always use alias for indexes.
The implementation for keyboard language layout converter is to load JSON file with mappings when the application starts, marshal it to the map and have one method for converting one language to another.
func (k *keyboardLayoutsManager) GetOppositeLayoutWord(originalWord string) string {
sb := k.sbPool.Get().(*strings.Builder)
defer k.sbPool.Put(sb)
sb.Reset()
for _, c := range []rune(originalWord) {
lowerCasedChar := strings.ToLower(string(c))
if char, ok := k.keyMappings[lowerCasedChar]; ok {
sb.WriteString(char)
} else {
sb.WriteString(lowerCasedChar)
}
}
return sb.String()
}
func (e *esRepository) Index(ctx context.Context, product domain.Product) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "esRepository.Index")
defer span.Finish()
span.LogFields(log.Object("product", product))
dataBytes, err := serializer.Marshal(&product)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrap(err, "serializer.Marshal"))
}
response, err := e.esClient.Index(
e.cfg.ElasticIndexes.ProductsIndex.Alias,
bytes.NewReader(dataBytes),
e.esClient.Index.WithPretty(),
e.esClient.Index.WithHuman(),
e.esClient.Index.WithTimeout(indexTimeout),
e.esClient.Index.WithContext(ctx),
e.esClient.Index.WithDocumentID(product.ID),
)
if err != nil {
return tracing.TraceWithErr(span, errors.Wrap(err, "esClient.Index"))
}
defer response.Body.Close()
if response.IsError() {
return tracing.TraceWithErr(span, errors.Wrap(errors.New(response.String()), "esClient.Index response error"))
}
e.log.Infof("document indexed: %s", response.String())
return nil
}
func (e *esRepository) Search(ctx context.Context, term string, pagination *utils.Pagination) (*domain.ProductSearch, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "esRepository.Search")
defer span.Finish()
span.LogFields(log.String("term", term))
shouldQuery := map[string]any{
"query": map[string]any{
"bool": map[string]any{
"should": []map[string]any{
{
"multi_match": map[string]any{
"query": term,
"fields": searchFields,
},
},
{
"multi_match": map[string]any{
"query": e.keyboardLayoutManager.GetOppositeLayoutWord(term),
"fields": searchFields,
},
},
{
"range": map[string]any{
"count_in_stock": map[string]any{
"gte": 0,
},
},
},
},
},
},
}
dataBytes, err := serializer.Marshal(&shouldQuery)
if err != nil {
return nil, tracing.TraceWithErr(span, errors.Wrap(err, "serializer.Marshal"))
}
response, err := e.esClient.Search(
e.esClient.Search.WithContext(ctx),
e.esClient.Search.WithIndex(e.cfg.ElasticIndexes.ProductsIndex.Alias),
e.esClient.Search.WithBody(bufio.NewReader(bytes.NewReader(dataBytes))),
e.esClient.Search.WithPretty(),
e.esClient.Search.WithHuman(),
e.esClient.Search.WithTimeout(searchTimeout),
e.esClient.Search.WithSize(pagination.GetSize()),
e.esClient.Search.WithFrom(pagination.GetOffset()),
)
if err != nil {
return nil, tracing.TraceWithErr(span, errors.Wrap(err, "esClient.Search"))
}
defer response.Body.Close()
if response.IsError() {
return nil, tracing.TraceWithErr(span, errors.Wrap(errors.New(response.String()), "esClient.Search error"))
}
hits := esclient.EsHits[*domain.Product]{}
if err := serializer.NewDecoder(response.Body).Decode(&hits); err != nil {
return nil, tracing.TraceWithErr(span, errors.Wrap(err, "serializer.Decode"))
}
responseList := make([]*domain.Product, len(hits.Hits.Hits))
for i, source := range hits.Hits.Hits {
responseList[i] = source.Source
}
e.log.Infof("repository search result responseList: %+v", responseList)
return &domain.ProductSearch{
List: responseList,
PaginationResponse: utils.NewPaginationResponse(hits.Hits.Total.Value, pagination),
}, nil
}
For Kubernetes prefer to use minikube with helm,
RabbitMQ, Elasticsearch, Jaeger, Kibana used same images as docker-compose file has, and prometheus community chart,
for correct working we have to add ServiceMonitor component for our microservice.
Working with k8s personally like to use lens it has a friendly UI and many useful features.
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.searchMicroserviceName }}
labels:
app: {{ .Values.searchMicroserviceName }}
spec:
replicas: {{ .Values.searchMicroserviceReplicas }}
template:
metadata:
name: {{ .Values.searchMicroserviceName }}
labels:
app: {{ .Values.searchMicroserviceName }}
spec:
containers:
- name: {{ .Values.searchMicroserviceName }}
image: {{.Values.searchMicroserviceImage }}
imagePullPolicy: Always
resources:
requests:
memory: {{.Values.resources.requests.memory }}
cpu: {{.Values.resources.requests.cpu }}
limits:
memory: {{.Values.resources.limits.memory }}
cpu: {{.Values.resources.limits.cpu }}
livenessProbe:
httpGet:
path: {{.Values.searchMicroserviceLivenessProbePath }}
port: {{.Values.searchMicroserviceLivenessProbePort }}
initialDelaySeconds: {{ .Values.searchMicroserviceInitialDelaySeconds }}
periodSeconds: {{ .Values.searchMicroservicePeriodSeconds }}
readinessProbe:
httpGet:
path: {{.Values.searchMicroserviceReadinessProbePath }}
port: {{.Values.searchMicroserviceReadinessProbePort }}
initialDelaySeconds: {{ .Values.searchMicroserviceInitialDelaySeconds }}
periodSeconds: {{ .Values.searchMicroservicePeriodSeconds }}
ports:
- containerPort: {{.Values.searchMicroserviceHttpPort }}
name: http
- containerPort: {{.Values.searchMicroserviceMetricsPort }}
name: metrics
- containerPort: {{.Values.searchMicroserviceHealthcheckPort }}
name: healthcheck
env:
- name: JAEGER_HOST_PORT
value: {{ .Values.jaegerHotPost }}
- name: ELASTIC_URL
value: {{ .Values.elasticSearchURL }}
- name: RABBITMQ_URI
value: {{ .Values.rabbitMqURI }}
- name: CONFIG_PATH
value: "/search-config/search-config.yaml"
volumeMounts:
- name: config
mountPath: "/search-config"
restartPolicy: Always
terminationGracePeriodSeconds: {{ .Values.searchMicroserviceTerminationGracePeriodSeconds }}
volumes:
- name: config
configMap:
name: {{ .Values.searchMicroserviceName }}-config-map
items:
- key: search-config.yaml
path: search-config.yaml
selector:
matchLabels:
app: {{ .Values.searchMicroserviceName }}
---
apiVersion: v1
kind: Service
metadata:
name: {{ .Values.searchMicroserviceName }}-service
labels:
app: {{ .Values.searchMicroserviceName }}
spec:
type: ClusterIP
selector:
app: {{ .Values.searchMicroserviceName }}
ports:
- name: http
port: {{.Values.searchMicroserviceHttpPort }}
protocol: TCP
- name: healthcheck
port: {{.Values.searchMicroserviceHealthcheckPort }}
protocol: TCP
targetPort: metrics
- name: metrics
port: {{.Values.searchMicroserviceMetricsPort }}
protocol: TCP
targetPort: metrics
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
release: monitoring
name: {{ .Values.searchMicroserviceName }}-service-monitor
namespace: default
spec:
selector:
matchLabels:
app: {{ .Values.searchMicroserviceName }}
endpoints:
- interval: 10s
port: metrics
path: {{.Values.prometheusPath }}
namespaceSelector:
matchNames:
- default
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .Values.searchMicroserviceName }}-config-map
data:
search-config.yaml: |
serviceName: search_microservice
grpc:
port: :5001
development: true
http:
port: :{{ .Values.searchMicroserviceHttpPort }}
development: {{ .Values.http.development }}
basePath: {{ .Values.http.basePath }}
productsPath: {{ .Values.http.productsPath }}
debugErrorsResponse: {{ .Values.http.debugErrorsResponse }}
ignoreLogUrls: {{ .Values.http.ignoreLogUrls }}
probes:
readinessPath: {{ .Values.searchMicroserviceReadinessProbePath }}
livenessPath: {{ .Values.searchMicroserviceLivenessProbePath }}
port: :{{ .Values.searchMicroserviceHealthcheckPort }}
pprof: :6001
prometheusPath: {{ .Values.prometheusPath }}
prometheusPort: :{{.Values.searchMicroserviceMetricsPort }}
checkIntervalSeconds: 10
logger:
level: {{ .Values.searchMicroserviceLogging.level }}
devMode: {{ .Values.searchMicroserviceLogging.devMode }}
encoder: {{ .Values.searchMicroserviceLogging.encoder }}
jaeger:
enable: true
serviceName: {{ .Values.searchMicroserviceName }}
hostPort: {{ .Values.jaegerHotPost }}
logSpans: false
timeouts:
postgresInitMilliseconds: 1500
postgresInitRetryCount: 3
elasticSearch:
addresses: [ {{ .Values.elasticSearchURL }} ]
username: ""
password: ""
apiKey: ""
enableLogging: false
elasticIndexes:
products:
path: {{ .Values.elasticIndexes.products.path }}
name: {{ .Values.elasticIndexes.products.name }}
alias: {{ .Values.elasticIndexes.products.alias }}
rabbitmq:
uri: {{ .Values.rabbitMqURI }}
exchangeAndQueueBindings:
indexProductBinding:
exchangeName: {{.Values.exchangeAndQueueBindings.indexProductBinding.exchangeName }}
exchangeKind: {{.Values.exchangeAndQueueBindings.indexProductBinding.exchangeKind }}
queueName: {{.Values.exchangeAndQueueBindings.indexProductBinding.queueName }}
bindingKey: {{.Values.exchangeAndQueueBindings.indexProductBinding.bindingKey }}
concurrency: {{.Values.exchangeAndQueueBindings.indexProductBinding.concurrency }}
consumer: {{.Values.exchangeAndQueueBindings.indexProductBinding.consumer }}
bulkIndexer:
numWorkers: {{ .Values.bulkIndexer.numWorkers }}
flushBytes: {{ .Values.bulkIndexer.flushBytes }}
flushIntervalSeconds: {{ .Values.bulkIndexer.flushIntervalSeconds }}
timeoutMilliseconds: {{ .Values.bulkIndexer.timeoutMilliseconds }}
More details and source code of the full project you can find here,
of course, in real-world applications, full-text search and business requirements can be much more complicated and for example includes machine learning, etc.
I hope this article is usefully and helpfully, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)
Posted on August 16, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.