Avoid duplicated requests with singleflight

renatosuero

Renato Suero

Posted on May 24, 2020

Avoid duplicated requests with singleflight

Do you have any endpoints which need to process a lot of things, it consumes third-party data, it's slow, etc... And to help these endpoints receive a lot of simultaneous requests (something you load on your home page for all users and the content is the same, for example some statistics).

Each time that endpoint is called your eyes are filled with tears. That is about to change :) I'll tell you how.
We will use the package singleflight.In words of the package:

"Package singleflight provides a duplicate function call suppression mechanism."

The idea of the package is, you create a key to identify the request and when there're other requests with the same key, it will wait for the answer which is processing for another request. When the request returns the result, it will share with the other requests were waiting for the result, therefore avoiding multiples requests/slow process.

Let's see the code, I've created an API to see the package running, you can see the code [here]. (https://github.com/renatosuero/devto-posts/tree/master/singleflight)

I've created an HTTP service that will consume data that comes from an external API.

package main

import (
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "time"
)

func main() {
    var requestGroup singleflight.Group
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        fmt.Println("calling the endpoint")
        response, err := http.Get("https://jsonplaceholder.typicode.com/photos")

        if err != nil {
            fmt.Print(err.Error())
            os.Exit(1)
        }

        responseData, err := ioutil.ReadAll(response.Body)
        if err != nil {
            log.Fatal(err)
        }
        time.Sleep(2 * time.Second)
        w.Write(responseData)
    })
Enter fullscreen mode Exit fullscreen mode

Accessing http://127.0.0.1:3000/ it willl call the API jsonplaceholder, in order to become interesting I've added 2 seconds for sleeping to simulate the process is slowlest.

Let's use vegeta, the goal is to execute a lot of requests to see singleflight shine. I defined to execute 10 requests per second and the duration will be 1 second.

echo "GET http://localhost:3000/" | vegeta attack -duration=1s -rate=10 | tee results.bin | vegeta report
Enter fullscreen mode Exit fullscreen mode

Here you can see the result of Vegeta and the output of our service:
Alt Text

As we can see, all requests called the external API.

Let's see singleflight shine now, we'll use the same configurations to run Vegeta again.

In this code I added a new endpoint /singleflight, in the function requestGroup.Do, I defined the key as s*singleflight*, now the request will verify if there's a processing running (using that key) , if there's a key, it won't call and it'll wait for the result of the call it is processing.
I added a print in the terminal to indicate when the request waits for result and uses the shared result.

// Because the pacage isn't a starndard library until now, you have to add him in your go path,go mod, vendor,etc..
import "golang.org/x/sync/singleflight"
var requestGroup singleflight.Group
//para este endpoint funcionar você precisa importar o pacote singleflight e criar essa variável(eu sei global e tal, mas para este post é suficiente).
    http.HandleFunc("/singleflight", func(w http.ResponseWriter, r *http.Request) {
        res, err, shared := requestGroup.Do("singleflight", func() (interface{}, error) {
            fmt.Println("calling the endpoint")
            response, err := http.Get("https://jsonplaceholder.typicode.com/photos")

            if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return nil, err
            }
            responseData, err := ioutil.ReadAll(response.Body)
            if err != nil {
                log.Fatal(err)
            }
            time.Sleep(2 * time.Second)
            return string(responseData), err
        })

        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        result := res.(string)
        fmt.Println("shared = ", shared)
        fmt.Fprintf(w, "%q", result)
    })
Enter fullscreen mode Exit fullscreen mode

Vegeta again

echo "GET http://localhost:3000/" | vegeta attack -duration=1s -rate=10 | tee results.bin | vegeta report
Enter fullscreen mode Exit fullscreen mode

Alt Text

I recommend you execute the code+vegeta and see it executing in your machine.
In the first endpoint, you'll see all the requests are executed and the log shows the calls.
In the second one, you'll see one request, and suddenly 10 true indicating all requests used the shared result.

That's an amazing resource, let's think of endpoints we have a heavy/slow process, or you pay by external requests. In the latter, not only it helps reducing processing power, but also saves you money by avoiding duplicated calls .
Another thing, I'm using an HTTP service, but it could be anything, for example it could be a database query, a worker, etc...

Well, that's it folks. I hope it helps you as it helped me knowing this cool package, I'd like to thank Henrique to show me that, and André Mota for your valuable review.
The package has an option to "forget" the key and another one that the result returns by channels. You can for example, define a timeout, after X time you defined, cancel the key or by channel.

💖 💪 🙅 🚩
renatosuero
Renato Suero

Posted on May 24, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related