Go Channel Patterns - Fan Out Bounded

b0r

b0r

Posted on December 14, 2021

Go Channel Patterns - Fan Out  Bounded

To improve my Go Programming skills and become a better Go engineer, I have recently purchased an excellent on-demand education from Ardan Labs. Materials are created by an expert Go engineer, Bill Kennedy.

I have decide to record my process of learning how to write more idiomatic code, following Go best practices and design philosophies.

This series of posts will describe channel patterns used for orchestration/signaling in Go via goroutines.

Fan Out Bounded Pattern

The main idea behind Fan Out Bounded Pattern is to have a limited number of goroutines that will do the work.

We have:

  • a fixed number of worker goroutines
  • a manager goroutine that creates/reads the work and sends it to the worker goroutines
  • a buffered channel that provides signaling semantics
    • used to notify worker goroutines about available work

Fan Out Bounded Pattern

Example

In Fan Out Bounded Pattern we have a fixed amount of employees that will do the work (worker goroutines).

We also have a manager (main goroutine) that generates work (or gets work from some predefined list of work). Manager notifies employee about work via communication channel ch. Employee gets the work from the communication channel ch.

Communication channel ch is capable of holding a limited amount of work "in the queue" (buffered channel). Once channel ch is full, manager can't send new work until employee takes work from the queue.

Once there is no more work for employees, manager closes the communication channel ch.

Once all the employees (worker goroutines) complete the work, they notify the manager and they all go home.

Use Case

Good use case for this pattern would be batch processing, where we have some amount of work to do, but we have a limited number of executors. Each executor does the job of processing multiple units of work.

Feel free to try the example on Go Playground

package main

import (
    "fmt"
    "sync"
)

func main() {
    // 5 peaces of work to do
    work := []string{"paper1", "paper2", "paper3", "paper4", "paper5"}

    // number of worker goroutines that will process the work
    // e.g. fixed number of employees
    // g := runtime.NumCPU()
    g := 2

    // use waitGroup to orchestrate the work
    // e.g. each employee take one seat in the office to do the work
    //      in this case we have two seats taken
    var wg sync.WaitGroup
    wg.Add(g)

    // make buffered channel of type string which provides signaling semantics
    // e.g. manager uses this channel to notify employees about available work
        // if buffer is full, manager can't send new work
    ch := make(chan string, g)

    // create and launch worker goroutines
    for e := 0; e < g; e++ {
        go func(emp int) {
            // execute this statement (defer) when this function/goroutine terminates
            // decrement waitGroup when there is no more work to do
            // do this once for-range loop is over and channel is closed
            // e.g. employee goes home
            defer wg.Done()

            // for-range loop used to check for new work on communication channel `ch`
            for p := range ch {
                fmt.Printf("employee %d : received signal : %s\n", emp, p)
            }

            // printed when communication channel is closed
            fmt.Printf("employee %d : received shutdown signal\n", emp)
        }(e)
    }

    // range over collection of work, one value at the time
    for _, wrk := range work {
        // signal/send work into channel
        // start getting goroutines busy doing work
        // e.g. manager sends work to employee via buffered communication channel
        //      if buffer is full, this operation blocks
        ch <- wrk
    }

    // once last piece of work is submitted, close the channel
    // worker goroutines will process everything from the buffer
    close(ch)

    // guarantee point, wait for all worker goroutines to finish the work
    // e.g. manager waiits for all employees to go home before closing the office
    wg.Wait()

}
Enter fullscreen mode Exit fullscreen mode

Result

go run main.go

employee 1 : received signal : paper1
employee 1 : received signal : paper3
employee 1 : received signal : paper4
employee 1 : received signal : paper5
employee 1 : received shutdown signal
employee 0 : received signal : paper2
employee 0 : received shutdown signal
Enter fullscreen mode Exit fullscreen mode

Conclusion

In this article, fan out buffered channel pattern was described. In addition, simple implementation was provided.

Readers are encouraged to check out excellent Ardan Labs education materials to learn more.

Resources:

  1. Ardan Labs
  2. Cover image by Igor Mashkov from Pexels
  3. Fan out picture
💖 💪 🙅 🚩
b0r
b0r

Posted on December 14, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related