Golang - Trabalhando com Canais, um passo além do básico.
Luiz Filipe S Mariz
Posted on July 19, 2022
Uma breve história...
Estava eu ajudando um colega com uma tarefa envolvendo tarefas assíncronas com golang. "Qual a melhor forma de fazer "x" coisa?". Nesse momento me vi diante de uma vastidão de possibilidades de resolver aquele problema. Como explicar então, de forma satisfatória, sem transformar a próxima meia hora num monólogo chato?
A ideia de compilar soluções
Havia lido tempos atrás essa solução que o autor também não atribuía a si, novidade na programação (só que não!). O artigo era o seguinte MULTIPLEXING CHANNELS IN GO, aconselho que leiam se quiserem aprofundar em alguns outros tópicos como canais.
A seguir apresentarei de forma bem direta a ideia de multiplexação.
O conceito:
Estamos falando da possibilidade de agregar um conjunto de canais à um único canal. Dessa forma podemos convergir os dados à um único ponto que será a retomada da sincronicidade do nosso código.
Um multiplexador (abreviação: MUX), por vezes denominado pelos anglicismos multiplexer ou multiplex, é um dispositivo que seleciona as informações de duas ou mais fontes de dados num único canal. São utilizados em situações onde o custo de implementação de canais separados para cada fonte de dados é maior que o custo e a inconveniência de utilizar as funções de multiplexação/demultiplexação. - Wikipedia. Repare nas imagens.
Vamos ao código
Seguiremos os seguintes passos:
- Criaremos uma função de espalhamento/divergência responsável por distribuir as Tasks para vários Workers.
- Criaremos a função do Worker que será responsável por executar a tarefa e encaminhar os resultados para o canal multiplexação (Result)
- Criaremos a Função de agregação responsável por receber os resultados e retornar ao fluxo síncrono da execução.
consideraremos para esse cenário as seguintes premissas:
// Tipo gerado para cada task poderíamos ter uma struct aqui
type Task int
// Tipo gerado para cada result poderíamos ter uma struct aqui
type Result int
// Função que representa um tempo de execução qualquer
// para finalização da tarefa do worker
func asyncSimulation(t Task) int {
time.Sleep(1 * time.Second)
return (int(t) * int(t))
}
0 - Nossa Função main:
func main() {
// Trabalharemos com 10 tasks para serem completadas
tasks := taskGenerator(1, 10)
results := make(chan Result)
// Esse wait group servirá para controlar o fechamento
// do canal de result e evitar deadlocks
wg := &sync.WaitGroup{}
// 5 workers serão utilizados nesse exemplo
for i := 0; i < 5; i++ {
wg.Add(1)
worker(tasks, results, wg)
}
// Verificamos o momento que todos os workers
// terminarem de trabalhar
verifyEnd(results, wg)
// Recebemos todos os resultados no canal result
// e imprimimos á medida que são resolvidos
resultAggregator(results)
}
1 - A função de espalhamento:
Essa é a função responsável por alimentar o canal de tasks com as tasks que serão executadas.
Como não definimos o tamanho do buffer do canal é necessário que a atribuição das tasks se faça numa goroutine para não gerarmos um deadlock.
func taskGenerator(start int, end int) <-chan Task {
tasks := make(chan Task)
go func() {
for i := start; i < end; i++ {
tasks <- Task(i)
}
close(tasks)
}()
return tasks
}
2 - Trabalhando com mais de um Worker:
Para trabalharmos com mais de um worker precisamos também gerenciar o fechamento do canal de results, para isso definimos um wait group para os workers
func worker(tasks <-chan Task, result chan<- Result, wg *sync.WaitGroup) {
go func() {
for task := range tasks {
result <- Result(asyncSimulation(task))
}
wg.Done()
}()
}
func verifyEnd(results chan<- Result, wg *sync.WaitGroup) {
go func() {
wg.Wait()
close(results)
}()
}
3 - A função de agregação:
Utilizamos essa função para receber todos os dados repassados para o canal de results e posteriormente somamos e mostramos eles na tela
func resultAggregator(res <-chan Result) {
sum := 0
totalResults := 0
for res := range res {
fmt.Printf("received result %v\n", res)
sum += int(res)
totalResults += 1
}
fmt.Printf("total os squares received: %d\n", totalResults)
fmt.Printf("sum of squares: %d", sum)
}
Análise do resultado
// Após 1 segundo recebemos os primeiros 5 resultados
// tempo gasto pelos 5 workers para processar as primeiras
// 5 tasks
received result 16
received result 9
received result 4
received result 0
received result 1
// Após mais 1 segundo recebemos os próximos 5 resultados
// tempo gasto pelos 5 workers para processar as últimas
// 5 tasks
received result 36
received result 25
received result 49
received result 64
received result 81
// impressão das demais informações do agregador
total os squares received: 10
sum of squares: 285
Expandindo a aplicabilidade:
Esse conceito de multiplexação é apenas um dentre vários que podemos utilizar para garantir o trabalho de forma assíncrona para solução de diversos problemas. Sua aplicação vai desde requisições para APIs externas até execução de algoritmos custosos em segmentos menores.
Fontes
MULTIPLEXING CHANNELS IN GO - https://katcipis.github.io/blog/mux-channels-go/
My Go Resolutions for 2017 - https://research.swtch.com/go2017
Multiplexador - https://pt.wikipedia.org/wiki/Multiplexador
Posted on July 19, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.