Go Server-sent events - A guide for sending messages to a specific user/client

hadius

Dương

Posted on May 25, 2023

Go Server-sent events - A guide for sending messages to a specific user/client

The problem

I am currently assigned with the task of implementing a notification feature for my side project application. Whenever an user comments on or reacts to another user's posts, an event is generated. Subsequently, the server sends a message to notify the owner of the post regarding this event.
TLDR: You can find the final code here

After exploring numerous solutions on the internet, I discovered that we can achieve the goal by using long polling, websockets or server-sent events. However, since the server solely interact with the client and require real-time notifications, there is no necessity to utilize long pooling and websockets.

Getting started

According to Wikipedia:

Server-Sent Events (SSE) is a server push technology enabling a client to receive automatic updates from a server via an HTTP connection, and describes how servers can initiate data transmission towards clients once an initial client connection has been established.

Talking is cheap, let's get our hands dirty by coding up this notification server.

// main.go
package main

import (
    "fmt"
    "log"
    "net/http"
    "time"
)

func sseHandler(w http.ResponseWriter, r *http.Request) {
    fmt.Println("Client connected")
    w.Header().Set("Access-Control-Allow-Origin", "*") // must have since it enable cors
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    flusher, ok := w.(http.Flusher)
    if !ok {
        fmt.Println("Could not init http.Flusher")
    }

    for {
        select {
        case <-r.Context().Done():
            fmt.Println("Connection closed")
            return
        default:
            fmt.Println("case message... sending message")
            fmt.Fprintf(w, "data: Ping\n\n")
            flusher.Flush()
            time.Sleep(5 * time.Second)
        }
    }
}

func main() {
    router := http.NewServeMux()
    router.HandleFunc("/sse", sseHandler)
    log.Fatal(http.ListenAndServe(":3500", router))
}
Enter fullscreen mode Exit fullscreen mode

You can conduct a quick test using the curl command

curl http://localhost:3500/event
Enter fullscreen mode Exit fullscreen mode

You should see something similar to this:

data: Ping

data: Ping
...
Enter fullscreen mode Exit fullscreen mode

In the code above, we define a HTTP handler sseHandler that handles requests to the /sse endpoint. Inside the handler, we set appropriate headers for SSE and send an event to the client every 5 seconds.
The header plays a vital role since the client relies on it for establishing the connection. To gain a better understanding of the importance of headers, you can experiment by disabling one of them and observing how it impact the client connection (spoiler alert: it seems that Content-Type: text/event-stream affects the connection establishment).

For the client-side, we will generate a front-end project utilizing a framework of your choice or even pure/vanilla JavaScript.

npx degit solidjs/templates/ts sse-client
cd sse-client
# yarn, npm i, pnpm i to install dependencies
Enter fullscreen mode Exit fullscreen mode
import { Component, createEffect, createSignal, onCleanup } from "solid-js";

const App: Component = () => {
  const [eventSource, setEventSource] = createSignal<EventSource | undefined>();
  createEffect(() => {
    const ev = new EventSource("http://localhost:3500/sse");
    ev.onmessage = (e) => {
      console.log({ e });
    };
    setEventSource(ev);
  });
  onCleanup(() => {
    eventSource()?.close();
  });
  return <main>Server-sent events</main>;
};

export default App;
Enter fullscreen mode Exit fullscreen mode

Visit the url http://localhost:3000, upon doing so, you should see something similar to the provided GIF:

Console gif

That is it, we have successfully create the SSE endpoint, establish a connection to the server using JavaScript. However, you might ask, "Okay, but how can I implement notification with this". Do you remember our requirement?

Whenever an user comments on or reacts to another user's posts, an event is generated. Subsequently, the server sends a message to notify the owner of the post regarding this event.

We have completely implemented the server sends message to notify part. Therefore, we are left with only a solitary task remaining, that is to dispatch a message when a user initiates an event. We will execute this functionality by leveraging the Go channel.

Flushing messages upon triggering an event

We need to utilize an effective communication method between functions and goroutines, and this is where go channel shine. Our plan would be creating and storing a channel for each SSE connection using a global store. Additionally, we will introduce a new endpoint, /time, to trigger an event. When a user calls this endpoint, we will broadcast the message to all the channels in the store.

var CHANNEL_STORE []*chan string = make([]*chan string, 0)

func removeChannel(ch *chan string) {
    pos := -1
    storeLen := len(CHANNEL_STORE)
    for i, msgChan := range CHANNEL_STORE {
        if ch == msgChan {
            pos = i
        }
    }

    if pos == -1 {
        return
    }
    CHANNEL_STORE[pos] = CHANNEL_STORE[storeLen-1]
    CHANNEL_STORE = CHANNEL_STORE[:storeLen-1]
    fmt.Println("Connection remains: ", len(CHANNEL_STORE))
}

func broadcast(msg string) {
    for _, ch := range CHANNEL_STORE {
        *ch <- msg
    }
}

func getTime(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Access-Control-Allow-Origin", "*")

    msg := time.Now().Format("15:04:05")
        broadcast(msg)
}

func sseHandler(w http.ResponseWriter, r *http.Request) {
    ch := make(chan string)
    CHANNEL_STORE = append(CHANNEL_STORE, &ch)

    fmt.Println("Client connected: ", len(CHANNEL_STORE))
    w.Header().Set("Access-Control-Allow-Origin", "*")
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    defer func() {
        close(ch)
        removeChannel(&ch)
        fmt.Println("Client closed connection")
    }()

    flusher, ok := w.(http.Flusher)
    if !ok {
        fmt.Println("Could not init http.Flusher")
    }

    for {
        select {
        case message := <-ch:
            fmt.Println("case message... sending message")
            fmt.Println(message)
            fmt.Fprintf(w, "data: %s\n\n", message)
            flusher.Flush()
        case <-r.Context().Done():
            fmt.Println("Client closed connection")
            return
        }
    }
}

func main() {
    router := http.NewServeMux()

    router.HandleFunc("/sse", sseHandler)
    router.HandleFunc("/time", getTime)

    log.Fatal(http.ListenAndServe(":3500", router))
}
Enter fullscreen mode Exit fullscreen mode

Last but not least, we need to update our client code.

import { Component, createEffect, createSignal, onCleanup } from "solid-js";

const App: Component = () => {
  const [eventSource, setEventSource] = createSignal<EventSource | undefined>();
  const [time, setTime] = createSignal<string>("");

  createEffect(() => {
    const ev = new EventSource("http://localhost:3500/sse");
    ev.onmessage = (e) => {
      console.log({ e });

      setTime(e.data);
    };

    setEventSource(ev);
  });

  async function handleGetTime() {
    const res = await fetch("http://localhost:3500/time");
    if (res.status !== 200) {
      console.log("Could not connect to the server");
    } else {
      console.log("OK");
    }
  }

  onCleanup(() => {
    eventSource()?.close();
  });

  return (
    <main>
      Time: {time()}
      <button onClick={handleGetTime}>Get time</button>
    </main>
  );
};

export default App;
Enter fullscreen mode Exit fullscreen mode

Your result should resemble this GIF:

End result

As you can see, I have launched two browser tabs in the demo, and upon triggering the /time/ endpoint. Both tabs receive the updated time immediately. This is achieved by broadcasting the message to all channels. We can further enhance the implementation by utilizing a Go map, associating the user identity as the key and a slice of channels as the corresponding value, enabling targeted message broadcasts.

// main.go
type SSEConn struct {
    mu      sync.Mutex
    clients map[string][]chan string
}

func NewSSEConn() *SSEConn {
    return &SSEConn{clients: make(map[string][]chan string)}
}

func (p *SSEConn) addClient(id string) *chan string {
    p.mu.Lock()
    defer func() {
        fmt.Println("Clients in add: ", p.clients)
        for k, v := range p.clients {
            fmt.Printf("Key: %s, value: %d\n", k, len(v))
            fmt.Println("Channels from id=", id, v)
        }
        p.mu.Unlock()
    }()

    c, ok := p.clients[id]
    if !ok {
        client := []chan string{make(chan string)}
        p.clients[id] = client
        return &client[0]
    }

    newCh := make(chan string)
    p.clients[id] = append(c, newCh)
    return &newCh
}

func (p *SSEConn) removeClient(id string, conn chan string) {
    p.mu.Lock()
    defer func() {
        fmt.Println("Clients in remove: ", p.clients)
        for k, v := range p.clients {
            fmt.Printf("Key: %s, value: %d", k, len(v))
        }
        p.mu.Unlock()
    }()

    c, ok := p.clients[id]
    if !ok {
        return
    }

    pos := -1

    for i, ch := range c {
        if ch == conn {
            pos = i
        }
    }

    if pos == -1 {
        return
    }

    close(c[pos])
    c = append(c[:pos], c[pos+1:]...)
    if pos == 0 {
        delete(p.clients, id)
    }
}

func (p *SSEConn) broadcast(id string, data, event string) {
    p.mu.Lock()
    defer p.mu.Unlock()

    c, ok := p.clients[id]
    if !ok {
        return
    }

    for _, ch := range c {
        ch <- fmt.Sprintf("event: %s\ndata: %s\n\n", event, data)
    }
}
Enter fullscreen mode Exit fullscreen mode

In the provided code snippet, I define the SSEConn struct and implement methods such as addClient, removeClient and broadcast. The SSEConn consists of two fields: clients and mu (Mutex). The mu field plays a crucial role in preventing race conditions, you can read more about that here and here. The clients field stores the clients' IDs along with their corresponding channels. Also, it is important to remember to update the handler implementation to accept the user ID.

var sseConn = NewSSEConn()

func getTime(w http.ResponseWriter, r *http.Request) {
    id := strings.TrimPrefix(r.URL.Path, "/time/")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    msg := time.Now().Format("15:04:05")
    sseConn.broadcast(id, msg, "timeEvent")
}

func sseHandler(w http.ResponseWriter, r *http.Request) {
    id := strings.TrimPrefix(r.URL.Path, "/sse/")
    ch := sseConn.addClient(id)

    w.Header().Set("Access-Control-Allow-Origin", "*")
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    defer sseConn.removeClient(id, *ch)

    flusher, ok := w.(http.Flusher)
    if !ok {
        fmt.Println("Could not init http.Flusher")
    }

    for {
        select {
        case message := <-*ch:
            fmt.Println("case message... sending message")
            fmt.Println(message)
            fmt.Fprintf(w, message)
            flusher.Flush()
        case <-r.Context().Done():
            fmt.Println("Client closed connection")
            return
        }
    }
}

func main() {
    router := http.NewServeMux()

    router.HandleFunc("/sse/", sseHandler)
    router.HandleFunc("/time/", getTime)

    log.Fatal(http.ListenAndServe(":3500", router))
}
Enter fullscreen mode Exit fullscreen mode

Final client code:

import {
  Component,
  createEffect,
  createSignal,
  JSX,
  onCleanup,
} from "solid-js";

const App: Component = () => {
  const [eventSource, setEventSource] = createSignal<EventSource | undefined>();
  const [time, setTime] = createSignal("");
  const [id, setId] = createSignal("");

  async function handleGetTime() {
    const res = await fetch(`http://localhost:3500/time/${id()}`);
    if (res.status !== 200) {
      console.log("Could not connect to the server");
    } else {
      console.log("OK");
    }
  }

  function handleChange(e: InputEvent) {
    setId((e.currentTarget as HTMLInputElement).value);
  }

  function handleConnect() {
    const ev = new EventSource(`http://localhost:3500/sse/${id()}`);
    ev.addEventListener("timeEvent", (e) => {
      console.log({ event: e.type });
      console.log({ data: e.data });

      setTime(e.data);
    });

    setEventSource(ev);
  }

  onCleanup(() => {
    eventSource()?.close();
  });

  return (
    <main>
      Time: {time()}
      <button onClick={handleGetTime}>Get time</button>
      <input type="text" onInput={handleChange} value={id()} />
      <button onClick={handleConnect}>Connect</button>
    </main>
  );
};

export default App;
Enter fullscreen mode Exit fullscreen mode

The final result:

Final result

By the end of this article, you have gained insight about SSE, Go channels, and the implementation of notification functionality using Go. I trust that you have found this article enjoyable and useful. Phew, considering the article's length, I will bring it to a close here.

💖 💪 🙅 🚩
hadius
Dương

Posted on May 25, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related