Handling with arbitrary channels by reflect.Select
Kazuki Higashiguchi
Posted on December 28, 2021
Key takeaways
- A select statement chooses which of a set of operations will proceed, which lets you wait on multiple channel operations.
- reflect.Select allows us to handle an arbitrary number of channels
Select statements
A select statement chooses which of a set of possible send or receive operations will proceed. Go's select lets you wait on multiple channel operations. For example, the following code can perform multiple channel handling.
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
done := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
ch1 <- 1
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- 2
}()
go func() {
time.Sleep(3 * time.Second)
quit <- struct{}{}
}()
Loop:
for {
select {
case <-done:
fmt.Println("quit")
break Loop
case num := <-ch1:
fmt.Printf("received: %d\n", num)
case num := <-ch2:
fmt.Printf("received: %d\n", num)
}
}
}
The implementation pattern of combining for statement and select statement is described in the book "Concurrency in Go" by Katherine Cox-Buday as "The for-select Loop".
for { // Either loop infinitely or range over something
select {
// Do some work with channels
}
}
Aside: labeled break
This is not the main topic, but Loop:
means a labeled statements. A labeled statement may be the target of a goto
, break
or continue
statement. In this case, it is trying to do a loop break in the select
scope, but for example, the following code will no work as it expected.
for {
select {
default:
break
}
}
The "Go" way for that kind of situations is to use labels a break on the label.
L:
for {
select {
default:
break L
}
}
Communicate with arbitrary channels
In the above example, the main
function listens for transmissions to three channels: ch1
, ch2
, and done
.
select {
case <-done:
fmt.Println("quit")
break Loop
case num := <-ch1:
fmt.Printf("received: %d\n", num)
case num := <-ch2:
fmt.Printf("received: %d\n", num)
}
However, this does not allow us to receive a variable number of channels. For example, a server holds multiple connections and selects one of them that matches the conditions.
reflect.Select
One option to achieve this is, as the tile says, to use reflect package, especially Select function. It was added in Go 1.1.
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)
Select
executes a select operation described by the list of cases whose type is reflect.SelectCase. Like the Go select statement, it blocks until at least one of the cases can proceed, makes a uniform pseudo-random choice, and then executes that case.
The first return value chosen
is the index of the chosen case, and if that case is a receive operation, the second and third value are returned. The second one is the value received and the third one is a boolean indicating whether the value corresponds to a send on the channel. When the channel is closed, a zero value received.
reflect.SelectCase describes a single case in a select operation.
type SelectCase struct {
Dir SelectDir // direction of case
Chan Value // channel to use (for send or receive)
Send Value // value to send (for send)
}
The kind of case depends on the field Dir
whose type is reflect.SelectDir. It indicates the communication direction.
type SelectDir int
const (
SelectSend SelectDir // case Chan <- Send
SelectRecv // case <-Chan:
SelectDefault // default
)
If Dir
is SelectSend
, the case represents a send operation. If Dir
is SelectRecv
, the case represents a receive operation. If Dir
is SelectDefault
, the case represents a default case. For example, the following code will allow you to implement picking up one channel at random from a dynamically generated list of channels.
package main
import (
"fmt"
"reflect"
)
func main() {
// Generate the list of channel
chs := make([]<-chan int, 0)
for i := 0; i < 4; i++ {
ch := make(chan int)
go func() { ch <- i }()
chs = append(chs, ch)
}
// Convert the list of cases(SelectCase)
cases := make([]reflect.SelectCase, len(chs))
for i, ch := range chs {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
// If Dir is SelectRecv, Send must be a zero Value
// Send: ...
}
}
// Wait until the channel receives the value (reflect.Select)
for i := 0; i < 4; i++ {
chosen, recv, ok := reflect.Select(cases)
if ok {
fmt.Printf("chosen: %d, recv: %v\n", chosen, recv)
}
}
}
Wait and select an idle connection from connection pools
Let's try to implement reflect.Select to select one of the idle connections from the pooled connections. The design overview is like this:
package main
import (
"errors"
"fmt"
"log"
"reflect"
)
type Conn struct {
ID string
// It may contains some kind of connections like database, websocket and etc.
}
type Pool struct {
idle chan *Conn
}
type Dispatcher struct {
// Dispatcher knows the list of connections.
pools []*Pool
}
func (s *Dispatcher) Select() (*Conn, error) {
// Convert the list of cases
cases := make([]reflect.SelectCase, len(s.pools))
for i, pool := range s.pools {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(pool.idle),
}
}
log.Print("dispatcher waiting any idle connection...")
_, recv, ok := reflect.Select(cases)
if !ok {
return nil, errors.New("failed to select a case from connection pool")
}
conn, ok := recv.Interface().(*Conn)
if !ok {
return nil, errors.New("the type of received connection is invalid")
}
return conn, nil
}
func main() {
// Prepare empty pools
pools := make([]*Pool, 0)
for i := 0; i < 4; i++ {
p := new(Pool)
p.idle = make(chan *Conn)
pools = append(pools, p)
}
d := Dispatcher{pools: pools}
// Notify a connection becomes idle
go func() {
for i, pool := range pools {
c := &Conn{ID: fmt.Sprintf("%d", i)}
pool.idle <- c
}
}()
// Wait and select an idle connection from pools
for i := 0; i < 4; i++ {
selected, err := d.Select()
if err != nil {
fmt.Printf("err: %#v", err)
}
fmt.Printf("selected Connection ID: %s\n", selected.ID)
}
}
In this example above, the following flow is implemented.
- Prepare four pools. Each pool has a channel named
idle
- The main thread waits for the
idle
channel byreflect.Select
- The channel is notified of an idle connection from another thread
$ go run main.go
2021/12/28 09:24:09 dispatcher waiting any idle connection...
selected Connection ID: 0
2021/12/28 09:24:09 dispatcher waiting any idle connection...
selected Connection ID: 1
2021/12/28 09:24:09 dispatcher waiting any idle connection...
selected Connection ID: 2
2021/12/28 09:24:09 dispatcher waiting any idle connection...
selected Connection ID: 3
Conclusion
reflect.Select allows us to handle an arbitrary number of channels. This article explained the basic knowledge needed to use this coding pattern and introduced some example code.
Posted on December 28, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.