Implement a timing wheel for millions of concurrent tasks.
Kevin Wan
Posted on April 11, 2022
For systems that contain lots of delayed in-process tasks. If we use lots of timers to handle the tasks, there will be lots of idle goroutines and lots of memory consumed. Lots of gourtines also consume more CPU to schedule them.
This article introduces the TimingWheel in go-zero
, which allows developers to schedule lots of delayed tasks. As for delayed tasks, two options are usually available.
-
Timer
, timers are used for one-off tasks. It represents a single event in the future. You tell the timer how long you want to wait, and it provides a channel that will be notified at that time. -
TimingWheel
, which maintains an array of task groups, and each slot maintains a chain of stored tasks. When execution starts, the timer executes the tasks in one slot at specified intervals.
Option 2 reduces the maintenance of tasks from priority queue O(nlog(n))
to bidirectional linked table O(1)
, and the execution of tasks also requires only polling for tasks at one point in time O(N)
, without putting in and removing elements O(nlog(n))
, as in the case of the priority queue.
Let's look at our own use of TimingWheel
in go-zero
:
TimingWheel in cache
Let's start with the use of TimingWheel
in the cache
of collection
.
timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {
key, ok := k.(string)
if !ok {
return
}
cache.Del(key)
})
if err ! = nil {
return nil, err
}
cache.timingWheel = timingWheel
This is the initialization of cache
which also initializes TimingWheel
to clean the expired key.
-
interval
: time interval to check the tasks -
numSlots
: the number of time slots -
execute
: the function to process tasks
The execution function in cache
is deleting the expired key, and this expiration calls are controlled by TimingWheel
to proceed.
Initialization
// The initialization of TimingWheel
func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (
*TimingWheel, error) {
tw := &TimingWheel{
interval: interval, // time frame interval
ticker: ticker, // the ticker to trigger the execution
slots: make([]*list.List, numSlots), // the slots to put tasks
timers: NewSafeMap(), // map to store task with {key, value}
tickedPos: numSlots - 1, // the previous ticked position
execute: execute, // execute function
numSlots: numSlots, // the number of slots
setChannel: make(chan timingEntry), // the channel to set tasks
moveChannel: make(chan baseEntry), // the channel to move tasks
removeChannel: make(chan interface{}), // the channel to remove tasks
drainChannel: make(chan func(key, value interface{})), // the channel to drain tasks
stopChannel: make(chan lang.PlaceholderType), // the channel to stop TimingWheel
}
// Prepare all the lists stored in the slot
tw.initSlots()
// start asynchronous concurrent process, use channel for task communication and passing
go tw.run()
return tw, nil
}
The above is a more visual representation of the "time wheel" of TimingWheel
, and the details will be explained later around this diagram.
go tw.run()
creates a goroutine to do the tick notification.
func (tw *TimingWheel) run() {
for {
select {
// Timer does time push -> scanAndRunTasks()
case <-tw.ticker.Chan():
tw.onTick()
// add task will input task to setChannel
case task := <-tw.setChannel:
tw.setTask(&task)
...
}
}
}
As you can see, the timer
execution starts at initialization and spins in the internal
time slot, and then the bottom keeps fetching the task from the list
in the slot
and giving it to the execute
execution.
Task Operation
The next step is to set the cache key
.
func (c *Cache) Set(key string, value interface{}) {
c.lock.
_, ok := c.data[key]
c.data[key] = value
c.lruCache.add(key)
c.lock.Unlock()
expiry := c.unstableExpiry.AroundDuration(c.expiry)
if ok {
c.timingWheel.MoveTimer(key, expiry)
} else {
c.timingWheel.SetTimer(key, value, expiry)
}
}
- check if the key exists in the
data map
- if it exists, update
expire
by callingMoveTimer()
- otherwise, set the key with expiry by calling
SetTimer()
So the use of TimingWheel
is clear, developers can add
or update
according to their needs.
Also, if we read the source code, we will find that SetTimer()
, MoveTimer()
is to send the task to the channel, and the task operation of the channel
will be continuously taken out by the goroutine created in run()
.
SetTimer() -> setTask()
.
- not exist task:
getPostion -> pushBack to list -> setPosition
- exist task:
get from timers -> moveTask()
MoveTimer() -> moveTask()
From the above call chain, there is a function that is called by all: moveTask()
func (tw *TimingWheel) moveTask(task baseEntry) {
// timers: Map => get [positionEntry "pos, task"] by key
val, ok := tw.timers.Get(task.key)
if !ok {
return
}
timer := val.(*positionEntry)
// {delay < interval} => delay is less than a time frame interval,
// the task should be executed immediately
if task.delay < tw.interval {
threading.GoSafe(func() {
tw.execute(timer.item.key, timer.item.value)
})
return
}
// If > interval, calculate the new pos, circle out of the time wheel by delaying the time
pos, circle := tw.getPositionAndCircle(task.delay)
if pos >= timer.pos {
timer.item.circle = circle
// Record the offset of the move
timer.item.diff = pos - timer.pos
} else if circle > 0 {
// move to the level of (circle-1)
circle --
timer.item.circle = circle
// because it's an array, add numSlots [which is the equivalent of going to the next level]
timer.item.diff = tw.numSlots + pos - timer.pos
} else {
// If offset is ahead of schedule, task is still on the first level
// mark the old task for deletion and requeue it for execution
timer.item.removed = true
newItem := &timingEntry{
baseEntry: task,
value: timer.item.value,
}
tw.slots[pos].PushBack(newItem)
tw.setTimerPosition(pos, newItem)
}
}
The above process has the following cases.
-
delay < internal
: because < single time precision, it means that this task needs to be executed immediately -
delay
for changes.-
new >= old
:<newPos, newCircle, diff>
-
newCircle > 0
: compute diff and convert circle to the next level, so diff + numslots - If the delay is simply shortened, remove the old task marker, rejoin the list, and wait for the next round of loops to be executed
-
Execute
Previously in the initialization, the timer in run()
keeps advancing, and the process of advancing is mainly to pass the task in the list to the execution of execute func
. Let's start with the execution of the timer.
// Timer 'execute every internal'
func (tw *TimingWheel) onTick() {
// update the current tick position on every execution
tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
// Get the chain of tasks stored in the tick position at this time
l := tw.slots[tw.tickedPos]
tw.scanAndRunTasks(l)
}
Immediately following this is how to execute execute
.
func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
// store the tasks {key, value} that currently need to be executed [the arguments needed by execute, passed in turn to execute]
var tasks []timingTask
for e := l.Front(); e ! = nil; {
task := e.Value.(*timingEntry)
// mark for deletion, do the real deletion in scan, delete data from map
if task.removed {
next := e.Next()
l.Remove(e)
tw.timers.Del(task.key)
e = next
continue
} else if task.circle > 0 {
// the current execution point has expired, but at the same time it's not in the first level,
// so even though the current level is done, it drops to the next level
// but it doesn't modify pos
task.circle--
e = e.Next()
continue
} else if task.diff > 0 {
// because the diff has already been marked, it needs to go into the queue again
next := e.Next()
l.Remove(e)
pos := (tw.tickedPos + task.diff) % tw.numSlots
tw.slots[pos].PushBack(task)
tw.setTimerPosition(pos, task)
task.diff = 0
e = next
continue
}
// the above cases are all non-executable cases, those that can be executed will be added to tasks
tasks = append(tasks, timingTask{
key: task.key,
value: task.value,
})
next := e.Next()
l.Remove(e)
tw.timers.Del(task.key)
e = next
}
// for range tasks, then just execute each task->execute
tw.runTasks(tasks)
}
The specific branching situation is explained in the comments, which can be combined with the previous moveTask()
, where circle
descends and diff
computation is the focus of the associated two functions.
As for the diff
calculation, it involves the calculation of pos, circle
.
// interval: 4min, d: 60min, numSlots: 16, tickedPos = 15
// step = 15, pos = 14, circle = 0
func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
steps := int(d / tw.interval)
pos = (tw.tickedPos + steps) % tw.numSlots
circle = (steps - 1) / tw.numSlots
return
}
The above process can be simplified to the following.
steps = d / interval pos = step % numSlots - 1 circle = (step - 1) / numSlots
Summarize
TimingWheel
relies on the timer to drive the time forward while taking out the tasks from thedoubly linked list
in the current time frame and passing them toexecute
for execution. Because it is driven byinternal
fixed time step, there may be: a 60s task,internal = 1s
, so it will run 59 times noop.in the expansion time, take
circle
layering, so that you can constantly reuse the originalnumSlots
, because the timer is constantlyloop
through circle by circle. Any number of tasks can be put into the fixed size ofslots
. This design can break the long time limit without creating additional data structures.
Also in
go-zero
there are many practical toolkits, using them for improving service performance and development efficiency.
Project address
https://github.com/zeromicro/go-zero
Welcome to use go-zero
and star to support us!
Posted on April 11, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.