Golang - Trabalhando com Canais, um passo além do básico.

lfsmariz

Luiz Filipe S Mariz

Posted on July 19, 2022

Golang - Trabalhando com Canais, um passo além do básico.

Uma breve história...

Estava eu ajudando um colega com uma tarefa envolvendo tarefas assíncronas com golang. "Qual a melhor forma de fazer "x" coisa?". Nesse momento me vi diante de uma vastidão de possibilidades de resolver aquele problema. Como explicar então, de forma satisfatória, sem transformar a próxima meia hora num monólogo chato?

A ideia de compilar soluções

Havia lido tempos atrás essa solução que o autor também não atribuía a si, novidade na programação (só que não!). O artigo era o seguinte MULTIPLEXING CHANNELS IN GO, aconselho que leiam se quiserem aprofundar em alguns outros tópicos como canais.

A seguir apresentarei de forma bem direta a ideia de multiplexação.

O conceito:

Estamos falando da possibilidade de agregar um conjunto de canais à um único canal. Dessa forma podemos convergir os dados à um único ponto que será a retomada da sincronicidade do nosso código.

Um multiplexador (abreviação: MUX), por vezes denominado pelos anglicismos multiplexer ou multiplex, é um dispositivo que seleciona as informações de duas ou mais fontes de dados num único canal. São utilizados em situações onde o custo de implementação de canais separados para cada fonte de dados é maior que o custo e a inconveniência de utilizar as funções de multiplexação/demultiplexação. - Wikipedia. Repare nas imagens.


Multiplexação com golang


Multiplexador em redes

Vamos ao código

Seguiremos os seguintes passos:

  1. Criaremos uma função de espalhamento/divergência responsável por distribuir as Tasks para vários Workers.
  2. Criaremos a função do Worker que será responsável por executar a tarefa e encaminhar os resultados para o canal multiplexação (Result)
  3. Criaremos a Função de agregação responsável por receber os resultados e retornar ao fluxo síncrono da execução.

consideraremos para esse cenário as seguintes premissas:

// Tipo gerado para cada task poderíamos ter uma struct aqui
type Task int

// Tipo gerado para cada result poderíamos ter uma struct aqui
type Result int

// Função que representa um tempo de execução qualquer 
// para finalização da tarefa do worker
func asyncSimulation(t Task) int {
    time.Sleep(1 * time.Second)
    return (int(t) * int(t))
}
Enter fullscreen mode Exit fullscreen mode

0 - Nossa Função main:

func main() {
        // Trabalharemos com 10 tasks para serem completadas
    tasks := taskGenerator(1, 10)
    results := make(chan Result)
        // Esse wait group servirá para controlar o fechamento
        // do canal de result e evitar deadlocks 
    wg := &sync.WaitGroup{}

        // 5 workers serão utilizados nesse exemplo
    for i := 0; i < 5; i++ {
        wg.Add(1)
        worker(tasks, results, wg)
    }

        // Verificamos o momento que todos os workers 
        // terminarem de trabalhar
    verifyEnd(results, wg)

        // Recebemos todos os resultados no canal result
        // e imprimimos á medida que são resolvidos
    resultAggregator(results)
}
Enter fullscreen mode Exit fullscreen mode

1 - A função de espalhamento:

Essa é a função responsável por alimentar o canal de tasks com as tasks que serão executadas.
Como não definimos o tamanho do buffer do canal é necessário que a atribuição das tasks se faça numa goroutine para não gerarmos um deadlock.

func taskGenerator(start int, end int) <-chan Task {
    tasks := make(chan Task)

    go func() {
        for i := start; i < end; i++ {
            tasks <- Task(i)
        }

        close(tasks)
    }()

    return tasks
}
Enter fullscreen mode Exit fullscreen mode

2 - Trabalhando com mais de um Worker:

Para trabalharmos com mais de um worker precisamos também gerenciar o fechamento do canal de results, para isso definimos um wait group para os workers

func worker(tasks <-chan Task, result chan<- Result, wg *sync.WaitGroup) {

    go func() {
        for task := range tasks {
            result <- Result(asyncSimulation(task))
        }
        wg.Done()
    }()
}

func verifyEnd(results chan<- Result, wg *sync.WaitGroup) {
    go func() {
        wg.Wait()
        close(results)
    }()
}
Enter fullscreen mode Exit fullscreen mode

3 - A função de agregação:

Utilizamos essa função para receber todos os dados repassados para o canal de results e posteriormente somamos e mostramos eles na tela

func resultAggregator(res <-chan Result) {
    sum := 0
    totalResults := 0

    for res := range res {
        fmt.Printf("received result %v\n", res)
        sum += int(res)
        totalResults += 1
    }

    fmt.Printf("total os squares received: %d\n", totalResults)
    fmt.Printf("sum of squares: %d", sum)
}
Enter fullscreen mode Exit fullscreen mode

Análise do resultado

// Após 1 segundo recebemos os primeiros 5 resultados
// tempo gasto pelos 5 workers para processar as primeiras
// 5 tasks
received result 16
received result 9
received result 4
received result 0
received result 1
// Após mais 1 segundo recebemos os próximos 5 resultados
// tempo gasto pelos 5 workers para processar as últimas
// 5 tasks
received result 36
received result 25
received result 49
received result 64
received result 81
// impressão das demais informações do agregador
total os squares received: 10
sum of squares: 285
Enter fullscreen mode Exit fullscreen mode

Expandindo a aplicabilidade:

Esse conceito de multiplexação é apenas um dentre vários que podemos utilizar para garantir o trabalho de forma assíncrona para solução de diversos problemas. Sua aplicação vai desde requisições para APIs externas até execução de algoritmos custosos em segmentos menores.


Fontes

💖 💪 🙅 🚩
lfsmariz
Luiz Filipe S Mariz

Posted on July 19, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related