π¬ Asynq: simple, reliable & efficient distributed task queue for your next Go project
Vic ShΓ³stak
Posted on April 6, 2021
Introduction
Hi, DEV friends! π It's time to share a great find that you must try in your next project. I'm talking about simple, reliable and efficient distributed task queue written on Go and called Asynq.
I already have experience using Asynq in production on one of my work projects (microservice for sending scheduled messages to subscribers of Telegram bot). After using it successfully, I wanted to tell you more about it so you can appreciate it too!
All right, let's get started! π
π Table of contents
- What is Asynq?
- The project we will create
- Full code of this project
- Let's write some code
- Asynq web UI
- Asynq CLI
What is Asynq?
Follow official Asynq GitHub page:
Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started. [...] Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.
Asynq is developed and maintained by Ken Hibino, who works as a software engineer at Google. So you can be sure of the quality of the code.
Most awesome queueing tasks features are:
- Guaranteed at least one execution of a task
- Scheduling of tasks
- Durability since tasks are written to Redis
- Retries of failed tasks
- Automatic recovery of tasks in the event of a worker crash
- Weighted priority queues
- Strict priority queues
- Low latency to add a task since writes are fast in Redis
- De-duplication of tasks using unique option
- Allow timeout and deadline per task
- Flexible handler interface with support for middlewares
- Ability to pause queue to stop processing tasks from the queue
- Periodic Tasks
Built-in scaling tools:
- Support Redis Cluster for automatic sharding and high availability
- Support Redis Sentinels for high availability
And, of course, useful tools for admins:
- Web UI to inspect and remote-control queues and tasks
- CLI to inspect and remote-control queues and tasks
π― By the way, I created an official logo for Asynq.
The project we will create
I would like to show with a simple example how you can easily work with Asynq in your Golang project. Suppose we have the task of sending a welcome email as soon as the user registers and, after a while, sending another reminder email to the user about something.
Here are the points we will stick to:
- Three queues for tasks with different priorities;
- Generating multiple tasks of different types at once and for different queues on the client;
- Separation into task handlers and payloads;
Full code of this project
For clarity, you can download the full example and run it on your machine. Especially for you, I put it on GitHub:
koddr / tutorial-go-asynq
π Tutorial: Asynq. Simple, reliable & efficient distributed task queue for your next Go project.
Let's write some code
Okay! We'll move on to the most interesting part, the writing of the code. I have supplied the code examples with detailed comments, so I won't dwell on them too much in the text of the article.
π₯ Please look at the code!
Creating tasks payloads
Let's define the payloads of our tasks. Let's create two types to send a message to Email: a welcome message (comes right away) and a reminder message (comes after a while).
// ./tasks/payloads.go
package tasks
import (
"time"
"github.com/hibiken/asynq"
)
const (
// TypeWelcomeEmail is a name of the task type
// for sending a welcome email.
TypeWelcomeEmail = "email:welcome"
// TypeReminderEmail is a name of the task type
// for sending a reminder email.
TypeReminderEmail = "email:reminder"
)
// NewWelcomeEmailTask task payload for a new welcome email.
func NewWelcomeEmailTask(id int) *asynq.Task {
// Specify task payload.
payload := map[string]interface{}{
"user_id": id, // set user ID
}
// Return a new task with given type and payload.
return asynq.NewTask(TypeWelcomeEmail, payload)
}
// NewReminderEmailTask task payload for a reminder email.
func NewReminderEmailTask(id int, ts time.Time) *asynq.Task {
// Specify task payload.
payload := map[string]interface{}{
"user_id": id, // set user ID
"sent_in": ts.String(), // set time to sending
}
// Return a new task with given type and payload.
return asynq.NewTask(TypeReminderEmail, payload)
}
Creating tasks handlers
Task handlers are our business logic, which is responsible for the specific behavior of tasks when triggered. To keep it simple, I will display a normal message in the console of the Asynq worker server.
// ./tasks/handlers.go
package tasks
import (
"context"
"fmt"
"github.com/hibiken/asynq"
)
// HandleWelcomeEmailTask handler for welcome email task.
func HandleWelcomeEmailTask(c context.Context, t *asynq.Task) error {
// Get user ID from given task.
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
// Dummy message to the worker's output.
fmt.Printf("Send Welcome Email to User ID %d\n", id)
return nil
}
// HandleReminderEmailTask for reminder email task.
func HandleReminderEmailTask(c context.Context, t *asynq.Task) error {
// Get int with the user ID from the given task.
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
// Get string with the sent time from the given task.
time, err := t.Payload.GetString("sent_in")
if err != nil {
return err
}
// Dummy message to the worker's output.
fmt.Printf("Send Reminder Email to User ID %d\n", id)
fmt.Printf("Reason: time is up (%v)\n", time)
return nil
}
Creating Asynq worker server
The central part of our project. It is this component that will be responsible for the logic of sending messages and queuing them (if we need it).
// ./worker/server.go
package main
import (
"log"
"tutorial-go-asynq/tasks"
"github.com/hibiken/asynq"
)
func main() {
// Create and configuring Redis connection.
redisConnection := asynq.RedisClientOpt{
Addr: "localhost:6379", // Redis server address
}
// Create and configuring Asynq worker server.
worker := asynq.NewServer(redisConnection, asynq.Config{
// Specify how many concurrent workers to use.
Concurrency: 10,
// Specify multiple queues with different priority.
Queues: map[string]int{
"critical": 6, // processed 60% of the time
"default": 3, // processed 30% of the time
"low": 1, // processed 10% of the time
},
})
// Create a new task's mux instance.
mux := asynq.NewServeMux()
// Define a task handler for the welcome email task.
mux.HandleFunc(
tasks.TypeWelcomeEmail, // task type
tasks.HandleWelcomeEmailTask, // handler function
)
// Define a task handler for the reminder email task.
mux.HandleFunc(
tasks.TypeReminderEmail, // task type
tasks.HandleReminderEmailTask, // handler function
)
// Run worker server.
if err := worker.Run(mux); err != nil {
log.Fatal(err)
}
}
And my favorite part of Asynq. If your application is going to grow, you will definitely want to make a more scalable system and Asynq can help you with that perfectly, because:
- You can create a personal Asynq worker server for each queue;
- Each Asynq worker server can be configured with any number of concurrent active workers to use;
- Next, you can use a simple Docker Compose solution to automatically start the right amount of each Asynq worker server replicas, when needed;
And if that's not enough anyway, you can easily start a Redis Cluster using Asynq built-in adapter... literally in a couple of minutes!
βοΈ I won't describe the configuration process, since the Wiki page of the project has detailed instructions.
Creating Asynq client
The client part can be anything, really. The main thing is that it can create new tasks and send them to the queue:
// ./client/main.go
package main
import (
"log"
"math/rand"
"time"
"tutorial-go-asynq/tasks"
"github.com/hibiken/asynq"
)
func main() {
// Create a new Redis connection for the client.
redisConnection := asynq.RedisClientOpt{
Addr: "localhost:6379", // Redis server address
}
// Create a new Asynq client.
client := asynq.NewClient(redisConnection)
defer client.Close()
// Infinite loop to create tasks as Asynq client.
for {
// Generate a random user ID.
userID := rand.Intn(1000) + 10
// Set a delay duration to 2 minutes.
delay := 2 * time.Minute
// Define tasks.
task1 := tasks.NewWelcomeEmailTask(userID)
task2 := tasks.NewReminderEmailTask(userID, time.Now().Add(delay))
// Process the task immediately in critical queue.
if _, err := client.Enqueue(
task1, // task payload
asynq.Queue("critical"), // set queue for task
); err != nil {
log.Fatal(err)
}
// Process the task 2 minutes later in low queue.
if _, err := client.Enqueue(
task2, // task payload
asynq.Queue("low"), // set queue for task
asynq.ProcessIn(delay), // set time to process task
); err != nil {
log.Fatal(err)
}
}
}
Asynq web UI
Unfortunately, at the moment, the Asyncq web UI installation is only available by copying binary from a releases page or running from a Docker container, like this:
# Pull the latest image
docker pull hibiken/asynqmon
# Run Asynqmon
docker run --rm \
--name asynqmon \
-p 8080:8080 \
hibiken/asynqmon
π Author of Asyncq and myself are already working on simplifying this process for many platforms in a future version (hopefully
v1.0.0
). If you have a desire to help, you are welcome!
In the meantime, this is Asynqmon repository:
The installed and running web UI will look like this:
List of servers and workers
Shows Asynq worker server statistics with detailed information about queues and active workers:
Supports mass operations with tasks in a convenient tabular list with information on a given queue:
Redis server information
It graphically displays all the necessary information about memory usage, server uptime, the number of connected clients at the moment and much more:
By the way, a full list of your Redis server configuration can be found at INFO Command Output section at the bottom of this page (data will be updated automatically).
Adaptive dark theme
Oh, yes! The future has arrived. A dark theme with an adaptive mode for the most fashionable Asynq users π
Asynq CLI
Install the Asynq CLI tool by running the following command:
go get -u github.com/hibiken/asynq/tools/asynq
To see the current state of the queues and their statistics:
asynq stats
Photos and videos by
- Vic ShΓ³stak https://shostak.dev
- Ken Hibino https://github.com/hibiken
P.S.
If you want more articles (like this) on this blog, then post a comment below and subscribe to me. Thanks! π»
βοΈ You can support me on Boosty, both on a permanent and on a one-time basis. All proceeds from this way will go to support my OSS projects and will energize me to create new products and articles for the community.
And of course, you can help me make developers' lives even better! Just connect to one of my projects as a contributor. It's easy!
My main projects that need your help (and stars) π
- π₯ gowebly: A next-generation CLI tool that makes it easy to create amazing web applications with Go on the backend, using htmx, hyperscript or Alpine.js and the most popular CSS frameworks on the frontend.
- β¨ create-go-app: Create a new production-ready project with Go backend, frontend and deploy automation by running one CLI command.
Posted on April 6, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
September 27, 2021