How to handle concurrency in Clojure with core.async

j0suetm

J0sueTM

Posted on January 15, 2024

How to handle concurrency in Clojure with core.async

Table of Contents

Introduction

Hey, how you doing? This article was written right after I had to painstakingly read the clojure.core.async source code in order to finish a task. So, I hope I save you from the same fate as I had 😄.

Also, since I tried to create content that can be used on both Clojure and ClojureScript systems, I opted for a more generic option core.async. There are other libraries out there, like the java virtual Threads, which can, in certain use cases, be a better suit than the one presented here.

If you're a visual learner and prefer video, I recommend this talk by Rich Hikey, the creator of Clojure, that goes a bit deeper into the inner concepts that went into the design of this tool.

Anyhow, I hope this may help you. Feel free to contact me in any case.

The building blocks

Before working our way into Concurrency de-facto in Clojure, I want to make sure that everyone is one the same basis. If you already know what is concurrency, then you can skip this section.

Channels

Channels are one of the main pillars in the clojure.core.async lib and most other concurrency libraries out there; you can think of them as tunnels where things get into from one side, go through, and then get out from the other side.

A tunner symbolazing a channel

Messages

The thing that goes through a channel is called a message, or an event. Messages can be anything really, as long as it can be produced and sent through the channel, it's good to go!

Producers and consumers

The thing that creates the message and puts it into the channel is called a producer, and the thing that receives the message on the other end, is called a consumer.

A fedex package, symbolizing the process of procuding, delivering and consuming


That's really all there is to it, a channel is theoretically a place where messages sent by producers are received by consumers.

If you're a student and know a bit about CS, its also valid to know that most Channels implementation are built upon the Queue Data Structure, i.e. FIFO.

Clojure Implementation

First of all, I'd like to note that there are multiple ways of working concurrently in Clojure/Script, based on your needs. In this article, I'll tackle the clojure.core.async lib. Its available on both JVM and Javascript ecosystems, and can be used to build powerfull asynchronous applications.

I made this article with JVM in mind, so please be aware that since there are small (but important) differences between the JVM and JS implementations, your walkthrought using Cljs might be slightly different from the one shown here.

Javascript applications are single-threaded, so blockeable actions aren't going to work (they'll actually not even transpile). Examples that might not work on JS will be signalized.

Include It!

You can include the library like this:



(:require '[clojure.core.async :as async :refer :all])


Enter fullscreen mode Exit fullscreen mode

or you can be more specific and include only the used items:



(:require '[clojure.core.async :refer [chan <!!]])


Enter fullscreen mode Exit fullscreen mode

or you can use the namespace instead of including unique items:



(:require '[clojure.core.async] :as async)

;; example
(async/chan)


Enter fullscreen mode Exit fullscreen mode

Behold! the channel

First thing first, a channel needs to be initialized in order for the async functionalities to be able to work:



(def my-channel (chan))


Enter fullscreen mode Exit fullscreen mode

A channel will close automatically, but you can close it manually:



(let [my-channel (chan)]
  ;; do powerfull asynchronous computations
  (close! my-channel))


Enter fullscreen mode Exit fullscreen mode

Also, a channel doesn't necessarily needs to begin unbuffered, (without any messages). You can create a channel that already has messages in it:



(def my-buffered-channel (chan 10))


Enter fullscreen mode Exit fullscreen mode

Put and Take

As stated before, a channel is usually a Queue, where its operations follow the order FIFO (First In First Out, where the First inserted element will be the first to be taken out as well).

Non CS people can assimilate this to the boring tuesday FastFood queue: The first person to ask for a burger will be the first to receive it, and consequently, the last one (the timid guy) to ask for his sandwich will be the last one to receive this.

A queue symbolazing the queue data structure

In core.async, we use the operations named PUT AND TAKE to do the operations of inserting and removing. They assume the forms > and < to signalize the operation, respectively. Not only that, but the symbols are actually >! and >!! for PUT, and <! and <!! for TAKE. We'll learn the differences in a second, but I hope you got the visual idea of the these symbols.

If you didn't, think of the PUT as a spear poking the channel, hoping it will open in order to PUT something in there. The TAKE, on the other hand, is like a spear already inside the channel, poking it from the inside, hoping it will open in order for something to get out of there, to be TAKEN from the channel.

A spear symbolizing the put and take operations

Ordinary Threads

This entire section might not work in JS

When creating a channel, any raw operation will be a fake asynchronous operation, because the only thread owning it is the main thread, meaning it will still block the main thread (which will consequently block the other tasks of the application).

To circunvent that, it's necessary to create a thread that doesn't impact the main one. There're two ways of doing that. The first one is to literally open a child thread:



(let [people-channel (chan)]
  (thread (>!! c "josue"))
  (println "the first person to enter was " (<!! c)))
;; => the first person to enter was josue


Enter fullscreen mode Exit fullscreen mode

Did you note the >!! put operation there? The meaning of the double ! is that that operation is blocking. i.e. The thread it is in will be blocked. The same applies to the <!! take operation after that. It means that the thread its in (in this case the main one) will be blocked until the operation is finished.

Go blocks

Now, usually, any asynchronous operation should take the minimum amount of overhead on top on the main thread. And whats that operation? The final taking one. Its a crucial step for communication with the rest of the modules of your application, so we technically let it take its time. However, any other operation, putting or taking, as long as its not the final retrieval, should be done in a non-blocking manner.

Theres where our go blocks enter the scene. They're a less explict version of thread that handles the IOC (inversion of control).



(let [people-channel (chan)]
  (go (>! people-channel "josue")
  (go (>! people-channel "maria"))
  (println (<!! (go (<! people-channel))))))
;; => josue


Enter fullscreen mode Exit fullscreen mode

Now, comparing to the ordinary threads, can You see that the operations inside the go block have only a single !? That's because they signalize that. Also, pay attention to how, on the last block of code, we take the content from the people-channel, and then do a blocking take <!! again. This is necessary in order to retrieve data from an asynchronous channel (this time created by go) back to the main thread.

Alts

Lets suppose we're creating a chat application. First things first, I think about creating a channel for the josue user:



(let [josue-channel (chan)]
  (while true
    (let [message (<!! (go (<! josue-channel)))]
      (when message
        (println "josue sent a message: " message)))))


Enter fullscreen mode Exit fullscreen mode

Hey, that looks nice. But wait, I just received an email from my client, and he's asking me to add 2500 more channels, they're marketing geniuses and the userbase grew from a weird nerd that uses the app every night to a quarter thousand people. I can't add all of those channels manually, there must be a better way to do that, right? Please tell me there is.

Of course there is, my friend, with Alts. They are a powerfull tool in core.async that lets you work with waiting for messages on multiples channels on the same time.

Now, with that feature in mind, our code can support as many channels as possible (as long as our memory doesn't explode 😄), and look as simple as it has ever looked:



(defn create-channels [people-count]
  (let [channels (repeatedly people-count chan)]
    (go
      (let [[message channel] (alts! channels)]
        (println "New message to " channel ": " message)))))

(create-channels 2500)


Enter fullscreen mode Exit fullscreen mode

That's all folks! It's always a great day to wake up and learn something new. And look, you just did that! I'm proud of you!

Thanks for taking the time to read my words, I'm really grateful.

Sincerely yours, Josué Teodoro Moreira | teodoro.josue@pm.me

References

💖 💪 🙅 🚩
j0suetm
J0sueTM

Posted on January 15, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related