go

Handling with arbitrary channels by reflect.Select

hgsgtk

Kazuki Higashiguchi

Posted on December 28, 2021

Handling with arbitrary channels by reflect.Select

Key takeaways

  • A select statement chooses which of a set of operations will proceed, which lets you wait on multiple channel operations.
  • reflect.Select allows us to handle an arbitrary number of channels

Select statements

A select statement chooses which of a set of possible send or receive operations will proceed. Go's select lets you wait on multiple channel operations. For example, the following code can perform multiple channel handling.

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    done := make(chan struct{})

    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- 1
    }()

    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- 2
    }()

    go func() {
        time.Sleep(3 * time.Second)
        quit <- struct{}{}
    }()

Loop:
    for {
        select {
        case <-done:
            fmt.Println("quit")
            break Loop
        case num := <-ch1:
            fmt.Printf("received: %d\n", num)
        case num := <-ch2:
            fmt.Printf("received: %d\n", num)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The implementation pattern of combining for statement and select statement is described in the book "Concurrency in Go" by Katherine Cox-Buday as "The for-select Loop".

for { // Either loop infinitely or range over something
    select {
    // Do some work with channels
    }
}
Enter fullscreen mode Exit fullscreen mode

Aside: labeled break

This is not the main topic, but Loop: means a labeled statements. A labeled statement may be the target of a goto, break or continue statement. In this case, it is trying to do a loop break in the select scope, but for example, the following code will no work as it expected.

for {
    select {
    default:
        break
    }
}
Enter fullscreen mode Exit fullscreen mode

The "Go" way for that kind of situations is to use labels a break on the label.

L:
    for {
        select {
            default:
                break L
        }
    }
Enter fullscreen mode Exit fullscreen mode

Communicate with arbitrary channels

In the above example, the main function listens for transmissions to three channels: ch1, ch2, and done.

select {
case <-done:
    fmt.Println("quit")
    break Loop
case num := <-ch1:
    fmt.Printf("received: %d\n", num)
case num := <-ch2:
    fmt.Printf("received: %d\n", num)
}
Enter fullscreen mode Exit fullscreen mode

However, this does not allow us to receive a variable number of channels. For example, a server holds multiple connections and selects one of them that matches the conditions.

reflect.Select

One option to achieve this is, as the tile says, to use reflect package, especially Select function. It was added in Go 1.1.

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)
Enter fullscreen mode Exit fullscreen mode

Select executes a select operation described by the list of cases whose type is reflect.SelectCase. Like the Go select statement, it blocks until at least one of the cases can proceed, makes a uniform pseudo-random choice, and then executes that case.

The first return value chosen is the index of the chosen case, and if that case is a receive operation, the second and third value are returned. The second one is the value received and the third one is a boolean indicating whether the value corresponds to a send on the channel. When the channel is closed, a zero value received.

reflect.SelectCase describes a single case in a select operation.

type SelectCase struct {
    Dir  SelectDir // direction of case
    Chan Value     // channel to use (for send or receive)
    Send Value     // value to send (for send)
}
Enter fullscreen mode Exit fullscreen mode

The kind of case depends on the field Dir whose type is reflect.SelectDir. It indicates the communication direction.

type SelectDir int

const (
    SelectSend    SelectDir // case Chan <- Send
    SelectRecv              // case <-Chan:
    SelectDefault           // default
)
Enter fullscreen mode Exit fullscreen mode

If Dir is SelectSend, the case represents a send operation. If Dir is SelectRecv, the case represents a receive operation. If Dir is SelectDefault, the case represents a default case. For example, the following code will allow you to implement picking up one channel at random from a dynamically generated list of channels.

package main

import (
    "fmt"
    "reflect"
)

func main() {
    // Generate the list of channel
    chs := make([]<-chan int, 0)
    for i := 0; i < 4; i++ {
        ch := make(chan int)
        go func() { ch <- i }()
        chs = append(chs, ch)
    }

    // Convert the list of cases(SelectCase)
    cases := make([]reflect.SelectCase, len(chs))
    for i, ch := range chs {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(ch),
            // If Dir is SelectRecv, Send must be a zero Value
            // Send: ...
        }
    }

    // Wait until the channel receives the value (reflect.Select)
    for i := 0; i < 4; i++ {
        chosen, recv, ok := reflect.Select(cases)
        if ok {
            fmt.Printf("chosen: %d, recv: %v\n", chosen, recv)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Wait and select an idle connection from connection pools

Let's try to implement reflect.Select to select one of the idle connections from the pooled connections. The design overview is like this:

The design overview

package main

import (
    "errors"
    "fmt"
    "log"
    "reflect"
)

type Conn struct {
    ID string
    // It may contains some kind of connections like database, websocket and etc.
}

type Pool struct {
    idle chan *Conn
}

type Dispatcher struct {
    // Dispatcher knows the list of connections.
    pools []*Pool
}

func (s *Dispatcher) Select() (*Conn, error) {
    // Convert the list of cases
    cases := make([]reflect.SelectCase, len(s.pools))
    for i, pool := range s.pools {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(pool.idle),
        }
    }
    log.Print("dispatcher waiting any idle connection...")
    _, recv, ok := reflect.Select(cases)
    if !ok {
        return nil, errors.New("failed to select a case from connection pool")
    }
    conn, ok := recv.Interface().(*Conn)
    if !ok {
        return nil, errors.New("the type of received connection is invalid")
    }

    return conn, nil
}

func main() {
    // Prepare empty pools
    pools := make([]*Pool, 0)
    for i := 0; i < 4; i++ {
        p := new(Pool)
        p.idle = make(chan *Conn)
        pools = append(pools, p)
    }

    d := Dispatcher{pools: pools}

    // Notify a connection becomes idle
    go func() {
        for i, pool := range pools {
            c := &Conn{ID: fmt.Sprintf("%d", i)}
            pool.idle <- c
        }
    }()

    // Wait and select an idle connection from pools
    for i := 0; i < 4; i++ {
        selected, err := d.Select()
        if err != nil {
            fmt.Printf("err: %#v", err)
        }
        fmt.Printf("selected Connection ID: %s\n", selected.ID)
    }
}
Enter fullscreen mode Exit fullscreen mode

In this example above, the following flow is implemented.

  • Prepare four pools. Each pool has a channel named idle
  • The main thread waits for the idle channel by reflect.Select
  • The channel is notified of an idle connection from another thread
$ go run main.go
2021/12/28 09:24:09 dispatcher waiting any idle connection...
selected Connection ID: 0
2021/12/28 09:24:09 dispatcher waiting any idle connection...
selected Connection ID: 1
2021/12/28 09:24:09 dispatcher waiting any idle connection...
selected Connection ID: 2
2021/12/28 09:24:09 dispatcher waiting any idle connection...
selected Connection ID: 3
Enter fullscreen mode Exit fullscreen mode

Conclusion

reflect.Select allows us to handle an arbitrary number of channels. This article explained the basic knowledge needed to use this coding pattern and introduced some example code.

💖 💪 🙅 🚩
hgsgtk
Kazuki Higashiguchi

Posted on December 28, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Where GitOps Meets ClickOps
devops Where GitOps Meets ClickOps

November 29, 2024

How to Use KitOps with MLflow
beginners How to Use KitOps with MLflow

November 29, 2024