Crafting a concurrent queue in Golang
Narasimha Prasanna HN
Posted on December 6, 2020
Queue is one of the most important data structure. Using queues, we can model many computational tasks easily and efficiently. Queues are used everywhere, in Operating systems, they are used to schedule processes, in routers, queues are used as buffers to store packets before processing them. Even cloud native applications composed of miroservices and workers use queues in between them for communication. Queues are extensively used for asynchronous processing.
What is a Job queue?
If you have ever used node.js, you would be familiar with job queues already. The so called Event Loop in node is nothing but a job queue which schedules asynchronous operations to any on the free uv-threadpool
workers. This type of pattern is common in many systems out there. So here is how queue is used for asynchronous processing, consider the scenario below:
- There will be a separate thread or a group of threads waiting for some kind of input.
- The main thread produces these inputs ( or tasks )
In this case, there has to be a medium which must act as a glue between input main thread and worker threads running in background. The medium should be designed in such a way that the input which is fed first should get a chance to be processed first, then the next. The input fed into this medium can be consumed by any of the free worker threads. Queue is the best data-structure for this scenario, because of FIFO (First-in, First-out) property, this property ensures ordering, if we make this data-structure somehow shareable between multiple threads we can achieve what we wanted to.
This is how a job queue looks like, the main thread places input on to the queue and continues its operation without waiting for the output. Since the workers wait for input, any one of the worker pops the input from the queue and and processes it. Any of the thread which is free can pop the input. Since the thread which is doing the work will have no time to pop the next input from the queue, only the free thread will get a chance to pop from the queue and becomes busy. If no thread is free, the inputs will still be there in the queue following the right order.
What are the advantages?
Imagine a scenario where main thread did all the work, it would work much slower because the main thread cannot process next job until it finishes the current one. A single thread cannot run on multiple cores at once, that is the main reason it runs sequentially. But on a machine with multiple cores, this is not a good approach, because all other cores sit ideally without doing any work. Since we used thread pool here, the free threads in our pool runs on separate cores and we are exploiting parallelism, if we have a 8 core machine, we can now process 8 inputs at once in the same time we took to process a single input. The main thread can now concentrate on fetching new inputs rather than processing them, because putting an input on the queue doesn't take much time.
Concurrent queue
Every advantage comes with a disadvantage as well. In the case we discussed above, we cannot use a normal queue, because it is not thread-safe. Since we have multiple threads operating on the same queue, synchronization problems will be obviously there, which we need to take care of. A concurrent-queue is a special queue designed to handle such issues. It allows multiple threads to operate on the same queue without any synchronization problems. In this blog we will be designing one such concurrent queue that can be used by multiple goroutines. The queue we design will have following properties:
- Can hold any type of data, i.e the queue is a generic implementation (
interface
type in golang) - Expandable in size, it grows and shrinks as required. (i.e no pre-allocated memory)
- Thread-safe by nature, no synchronization issues.
Let's create a normal queue first
Synchronization issues has nothing to do with how data is stored, we only have to control how data is accessed. This allows us to implement a normal queue first and use it as a storage backend for the concurrent queue.
Here is how we are going to design the queue:
- We will be using a doubly-linked list.
- The queue will have push and pop functions.
- We insert data at the head of the list and pop from the tail. This is how
push
andpop
will be implemented. - We make the queue to grow only until
maxSize
. So we keep track of current size every time an enqueue or dequeue occurs and take decisions based on the current size. Here is an implementation of the queue using doubly linked list.
//Node storage of queue data
type Node struct {
data interface{}
prev *Node
next *Node
}
//QueueBackend Backend storage of the queue, a double linked list
type QueueBackend struct {
//Pointers to root and end
head *Node
tail *Node
//keep track of current size
size uint32
maxSize uint32
}
func (queue *QueueBackend) createNode(data interface{}) *Node {
node := Node{}
node.data = data
node.next = nil
node.prev = nil
return &node
}
func (queue *QueueBackend) put(data interface{}) error {
if queue.size >= queue.maxSize {
err := errors.New("Queue full")
return err
}
if queue.size == 0 {
//new root node
node := queue.createNode(data)
queue.head = node
queue.tail = node
queue.size++
return nil
}
//queue non-empty append to head
currentHead := queue.head
newHead := queue.createNode(data)
newHead.next = currentHead
currentHead.prev = newHead
queue.head = currentHead
queue.size++
return nil
}
func (queue *QueueBackend) pop() (interface{}, error) {
if queue.size == 0 {
err := errors.New("Queue empty")
return nil, err
}
currentEnd := queue.tail
newEnd := currentEnd.prev
if newEnd != nil {
newEnd.next = nil
}
queue.size--
if queue.size == 0 {
queue.head = nil
queue.tail = nil
}
return currentEnd.data, nil
}
func (queue *QueueBackend) isEmpty() bool {
return queue.size == 0
}
func (queue *QueueBackend) isFull() bool {
return queue.size >= queue.maxSize
}
We have created a QueueBackend
, this is just a storage layer on top of which synchronization primitives are implemented. The code above is simple and anyone with basic data structures knowledge can understand it. Next we are going to implement synchronization primitives.
Adding concurrency support
We are going to create a ConcurrentQueue
type which uses QueueBackend
type internally. The ConcurrentQueue
type is defined as shown below:
//ConcurrentQueue concurrent queue
type ConcurrentQueue struct {
//mutex lock
lock *sync.Mutex
//empty and full locks
notEmpty *sync.Cond
notFull *sync.Cond
//queue storage backend
backend *QueueBackend
}
Here, the backend
is a pointer variable which holds the address of the queue storage layer we created before. Now let us see what other members actually mean.
-
lock
: This is a member pointer of typesync.Mutex
, a mutex is a synchronization primitive, mutex simply means alock
, lock can be eitherlocked
orunclocked
. If the mutex is locked, other threads will wait until the lock is unlocked. This ensures only one thread can access the queue at a time. Here is how a thread will operate on the queue: a. The thread first locks the mutex, this way it gains exclusive control over the queue by avoiding other threads from accessing the queue at the same time. b. It then callseuqueue
ordequeue
over the queue and once that is done, it unlocks the mutex, allowing other waiting threads to get access to the queue.
This is more than enough to provide a basic synchronization support for the queue, but we actually need more. Once a thread gains control over the queue, it pops the data if present, if no data is present, it has to wait. One way to implement this wait operation is to use a while loop and simply keep checking the queue for data every time. This is not so cool because it runs in an infinite loop taking all the CPU cycles (the core usage goes to 100%). We need some mechanism that allows us to suspend the thread until data becomes available, we can do so by using conditional locks
. We have defined two such locks, notEmpty
and notFull
. notEmpty
means that, the queue is not empty and has some data, so anyone can pop. notFull
means that, the queue is not full and the main thread can still push the data into it. If queue is not in notEmpty
state, the consumer (worker threads) will wait. Similarly, if notFull
condition is not met, the producer (main thread) will wait for queue to become empty. Now let us see how euqueue
is implemented:
func (c *ConcurrentQueue) enqueue(data interface{}) error {
c.lock.Lock()
for c.backend.isFull() {
//wait for empty
c.notFull.Wait()
}
//insert
err := c.backend.put(data)
//signal notEmpty
c.notEmpty.Signal()
c.lock.Unlock()
return err
}
The equeue function checks if anyone has already locked the queue, if not, it locks the queue to gain control, next it checks if the queue is full, if yes, it waits until it is not full. (in the line c.notFull.Wait()
), After waiting, a new input is pushed into the queue, calling push
over the queue backend. Now the queue is not empty, because there is a new input, c.notEmpty.Signal()
is called, this notifies the threads waiting over notEmpty
that a new entry has been inserted and the queue is no more empty, finally the lock is released, allowing other threads to operate on the queue.
Now, let us see how dequeue
is implemented at the consumer's side:
func (c *ConcurrentQueue) dequeue() (interface{}, error) {
c.lock.Lock()
for c.backend.isEmpty() {
c.notEmpty.Wait()
}
data, err := c.backend.pop()
//signal notFull
c.notFull.Signal()
c.lock.Unlock()
return data, err
}
The consumer tries to access the queue for reading, if it is locked, it waits for the thread which is already accessing the queue to release the lock. Next, it checks if the queue is empty, if yes, it waits over notEmpty
mutex. (in the line c.notEmpty.Wait()
). Once the queue is not empty (remember c.notEmpty.Signal()
in enqueue), it pops the entry and notifies the producer to push new input, since there is a space in the queue now. This is done by calling c.notFull.Signal()
).
That is it, we have implemented basic enqueue
and dequeue
methods with synchronization support. We also implement a miscellaneous method called getSize
which simply returns the size, even this is done with lock because it might return the wrong value if another thread push/pops the data at the same time.
func (c *ConcurrentQueue) getSize() uint32 {
c.lock.Lock()
size := c.backend.size
c.lock.Unlock()
return size
}
To wrap this entire thing, we create a method called NewConcurrentQueue
which simply creates an instance of ConcurrentQueue
type and initializes it with proper values. Here we also initialize all the necessary mutexes. Look at it's code below:
//NewConcurrentQueue Creates a new queue
func NewConcurrentQueue(maxSize uint32) *ConcurrentQueue {
queue := ConcurrentQueue{}
//init mutexes
queue.lock = &sync.Mutex{}
queue.notFull = sync.NewCond(queue.lock)
queue.notEmpty = sync.NewCond(queue.lock)
//init backend
queue.backend = &QueueBackend{}
queue.backend.size = 0
queue.backend.head = nil
queue.backend.tail = nil
queue.backend.maxSize = maxSize
return &queue
}
I was able to create a simple HTTP work pool using this queue implementation as a job queue. You can checkout the complete source code here : httppool
Thanks for reading this!
Posted on December 6, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.