Circuit breakers (in Go)
Homayoon Alimohammadi
Posted on August 23, 2023
Circuit breakers are a fundamental and basic, yet vital part of a system, especially if you are dealing with a micro-service architecture. Recently one of my colleagues was wondering what these ‘proxy like’ pieces of code do, and while explaining to him, I figured although I kind of know what a circuit breaker is at a high level, I don’t really know what is going on under the hood. So I decided to dig deeper and share what I’ve found. In this article we will both learn about circuit breakers at a high level, as well walking through a basic implementation of the circuit breaker pattern in Go.
Introduction
Imagine “Service A” is trying to contact “Service B” (as the server) via an RPC. From the stand point of Service A, it kinda looks like this:
package service_A
func callServiceB(req Request) {
resp, err := serviceB.SomeMethod(context.Background(), req)
if err != nil {
log.Printf("got error: %s", err.Error())
return
}
return log.Printf("got response: %s", resp)
}
Well, this is probably the most basic way to make a call to another service, but there are a number things that can (an will) go wrong. Let’s say we did not receive any response from Service B, what are the possible reasons for that? One of them (which is not necessarily our concern right now) is that since we’re mostly dealing with unreliable network connections, it’s quite possible that the request is never processed by Service B. In this case we won’t solely rely on TCP retransmission, in fact most RPC frameworks provide their own retry mechanisms and it will be mostly handled in the application layer. But the fact that there was no response from the server does not always mean that there was a network issue, maybe there is something wrong with Service B.
In general, services (even the most reliable ones) tend to get slow or unavailable from time to time. This might result in an error in response to some (or all) of the incoming calls. If a service is really busy processing lots of requests, we might want to limit our request rate to that service, or even stop sending any requests for a while, letting it to take a breath and do whatever it was doing in peace.
In another case, let’s say the service was unavailable for a while and our previously failed requests (let’s say 100 requests) are being retried one after another. If we don’t limit the number of our future requests to that service, and make an additional 100 requests, when the service eventually recovers it will be faced with 200 requests at the same time which may even cause the service to immediately crash once again.
Although in the real world most RPCs are made with a specific timeout and won’t be retried afterwards, it is still a good practice not to flood a service with requests while it has problems responding.
Another positive point of this limiting mechanism for us is that we don’t bother making any request to that service (for a while), hence no time consuming I/O. This is definitively faster than just waiting for a while and then receive the failed result (like a 5xx status code).
Circuit Breakers
In simple terms, an RPC between two services is like drawing 2 straight lines between the two. One line sends the request from service A to service B, and the other returns the response from service B to service A. But in order to implement that “limiting” policy, we need to have some kind of a middle-man which decides whether to direct a request to the destination or not.
This middle-man, proxy (not a network proxy) or wrapper is either going to let the circuit (or connection) between two services “Closed”, or stop one from calling the other, hence “Opening” the circuit.
The main idea behind a circuit breaker is as follow:
By default the circuit is in the Closed mode and you are allowed to make calls freely to the destination. after a certain amount of failed responses from the destination (a threshold, let’s say 5), it is going to stop you from making any further requests for a while (a backoff time, like 30 seconds) in which the circuit is considered Open. **After that interval is over, It goes into a **Half-Open state. If the next request is going to determine whether we’re going to end up in the closed staet or get back in the open state. If successful, the circuit is going to be closed, but if the request fails we will be back in the open state and forced to wait for another back-off interval.
Let’s see how a simple circuit breaker is implemented in Go.
Implementation
From the stand point of the service which is using a circuit breaker in order to make requests to another service, making the RPC is a bit different:
All the code you see below can be found in this github repo as well.
package service_A
func callServiceB(cb CircuitBreaker, req Request) {
r, err := cb.Execute(func() (interface{}, error) {
return serviceB.SomeMethod(req)
})
if err != nil {
log.Printf("got error: %s", err.Error())
return
} else {
resp, ok := r.(serviceB.Response)
if !ok {
log.Println("type assertion failed")
return
}
log.Printf("got response: %s", resp)
}
}
Let’s take a look at what the cb.Execute does:
package circuitbreaker
func (cb *circuitbreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
err := cb.doPreRequest()
if err != nil {
return nil, err
}
res, err := req()
err = cb.doPostRequest(err)
if err != nil {
return nil, err
}
return res, nil
}
The actual request is going to happen on line 8, res, err := req(), before which there should be a beforeRequest functionality:
func (cb *circuitbreaker) doPreRequest() error {
if cb.state == open {
return ErrRefuse
}
return nil
}
This checks to see whether the current state is open or not. If the circuit is open, simple return an error indicating that the connection is refused. Let’s look at the structure of our circuit breaker so far:
type State string
const (
open State = "open"
closed State = "closed"
halfOpen State = "half-open"
)
type circuitbreaker struct {
state State
}
Fine, what about the afterRequest ?
func (cb *circuitbreaker) doPostRequest(err error) error {
cb.mutex.Lock()
defer cb.mutex.Unlock()
if err == nil {
if cb.policy == MaxConsecutiveFails {
cb.fails = 0
}
cb.state = closed
return nil
}
if cb.state == halfOpen {
cb.state = open
cb.openChannel <- struct{}{}
return err
}
cb.fails++
if cb.failsExcceededThreshod() {
cb.state = open
cb.openChannel <- struct{}{}
}
return err
}
This functions introduces a bunch of new fields and methods, let’s describe the logic behind this function and then expand our circuit breaker structure.
mutex is used in order to make sure Read-Write-Modify cycles are done in a safe way, in case we have concurrent attempts to modify the circuit breaker state
err is the actual error that *destination *returned. If nil, basically reset the state and go on. fails are the number of the failed requests in the current closed state. MaxConsecutiveFails is a policy which means that the circuit breaker must open the circuit after experiencing n number of consecutive fails.
If there was an error and we were in a half-open state, open the circuit and go on. The cb.openChannel <- struct{}{} triggers the waiting interval
If there was an error and we were not in the half-open state, simply increment the number of failed attemps in the current state.
Check to see if the number of failed attempts excceeded the threshold regarding your policy. If so, open the circuit and trigger the waiting interval
Let’s take a look at the complete structure of our circuit breaker:
type Policy int
type State string
const (
// MaxFails specifies the maximum non-consecutive fails which are allowed
// in the "Closed" state before the state is changd to "Open".
MaxFails Policy = iota
// MaxConsecutiveFails specifies the maximum consecutive fails which are allowed
// in the "Closed" state before the state is changed to "Open".
MaxConsecutiveFails
)
const (
open State = "open"
closed State = "closed"
halfOpen State = "half-open"
)
type circuitbreaker struct {
policy Policy
maxFails uint64
maxConsecutiveFails uint64
openInterval time.Duration
// fails is the number of failed requets for the current "Closed" state,
// resets after a successful transition from half-open to closed.
fails uint64
// current state of the circuit
state State
// openChannel handles the event transfer mechanism for the open state
openChannel chan struct{}
mutex sync.Mutex
}
And the helper functions we’ve seen so far:
func (cb *circuitbreaker) failsExcceededThreshod() bool {
switch cb.policy {
case MaxConsecutiveFails:
return cb.fails >= cb.maxConsecutiveFails
case MaxFails:
return cb.fails >= cb.maxFails
default:
return false
}
}
func (cb *circuitbreaker) openWatcher() {
for range cb.openChannel {
time.Sleep(cb.openInterval)
cb.mutex.Lock()
cb.state = halfOpen
cb.fails = 0
cb.mutex.Unlock()
}
}
The openWatcher simply listens on the openChannel and upon receive, sleeps the goroutine for the openInterval and then resets the number offails, then changes the state to half-open. The cycle then repeats. But when is the openWatcher called? Just when we are initializing our circuit breaker:
type ExtraOptions struct {
// Policy determines how the fails should be incremented
Policy Policy
// MaxFails specifies the maximum non-consecutive fails which are allowed
// in the "Closed" state before the state is changd to "Open".
MaxFails *uint64
// MaxConsecutiveFails specifies the maximum consecutive fails which are allowed
// in the "Closed" state before the state is changed to "Open".
MaxConsecutiveFails *uint64
OpenInterval *time.Duration
}
func New(opts ...ExtraOptions) Circuitbreaker {
var opt ExtraOptions
if len(opts) > 0 {
opt = opts[0]
}
if opt.MaxFails == nil {
opt.MaxFails = literal.ToPointer(uint64(5))
}
if opt.MaxConsecutiveFails == nil {
opt.MaxConsecutiveFails = literal.ToPointer(uint64(5))
}
if opt.OpenInterval == nil {
opt.OpenInterval = literal.ToPointer(5 * time.Second)
}
cb := &circuitbreaker{
policy: opt.Policy,
maxFails: *opt.MaxFails,
maxConsecutiveFails: *opt.MaxConsecutiveFails,
openInterval: *opt.OpenInterval,
openChannel: make(chan struct{}),
}
go cb.openWatcher()
return cb
}
Let’s try what we’ve written with the most simplistic way possible:
package main
func main() {
cbOpts := circuitbreaker.ExtraOptions{
Policy: circuitbreaker.MaxFails,
MaxFails: literal.ToPointer(uint64(5)),
MaxConsecutiveFails: literal.ToPointer(uint64(5)),
OpenInterval: literal.ToPointer(160 * time.Millisecond),
}
cb := circuitbreaker.New(cbOpts)
wg := &sync.WaitGroup{}
for i := 1; i < 100; i += 1 {
wg.Add(1)
go makeServiceCall(i, cb, wg)
time.Sleep(10 * time.Millisecond)
}
log.Println("sent all the requests")
wg.Wait()
log.Println("got all the responses, exiting.")
}
func serviceMethod(id int) (string, error) {
if val := rand.Float64(); val <= 0.3 {
return "", errors.New("failed")
}
return fmt.Sprintf("[id: %d] done.", id), nil
}
func makeServiceCall(id int, cb circuitbreaker.Circuitbreaker, wg *sync.WaitGroup) {
defer wg.Done()
resp, err := cb.Execute(func() (interface{}, error) {
return serviceMethod(id)
})
if err != nil {
log.Printf("[id %d] got err: %s", id, err.Error())
} else {
log.Printf("[id %d] success: %s", id, resp)
}
}
Feel free to experiment with different values for policy, MaxFails, MaxConsecutiveFails and openInterval.
Conclusion
So far we’ve seen how circuit breakers play a vital role in reliability and performance. Without circuit breakers the client might waste lots of time trying to connect to an unavailable service and waiting for the whole timeout period each time. The target service on the other hand, might be bombarded with dozens of requests right after recovering from a down time or disaster, leading to another unfortunate outage. We’ve also walked through a simple implementation of the circuit breaker pattern in Go. Needless to say that I’d love to know your opinion about this subject as well as any other comments or suggestions that you might have. I learn a lot reading through your comments as well as corrections or additional information that your provide in your comments.
Posted on August 23, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.