Dive into Web RTC or write SFU on your own

vthesaint

the_Aristo

Posted on February 9, 2024

Dive into Web RTC or write SFU on your own

WebRTC is a browser technology designed to transfer streaming data between browsers or applications using point-to-point transmission technology.

Web RTC has been supported in most browsers for a long time, so bypassing the technology is quite stupid. That’s what I thought, so I decided to write an sfu server in golang as a pet project.

About Web RTC

Here I will briefly go over the basics of how Web RTC works; for those who are interested in going a little deeper, I’ll leave the link here.
In order for two peers to be able to provide themselves with RTCPeerConnection, the SDP (Session Description Protocol) protocol is used. The protocol has a key-value structure and is essentially a description of a single peer (the name speaks for itself).

Example SessionDescription

v=0
o=- 0 0 IN IP4 127.0.0.1
s=-
c=IN IP4 127.0.0.1
t=0 0
m=audio 4000 RTP/AVP 111
a=rtpmap:111 OPUS/48000/2
m=video 4002 RTP/AVP 96
a=rtpmap:96 VP8/90000
Enter fullscreen mode Exit fullscreen mode

After a peer has generated a SessionDescription for us (hereinafter simply SD), the peer sends it to another peer in the form of an offer.

Here the question immediately arises: “Where should I send it?” And the question is absolutely correct, a peer by itself cannot know the IP address of another peer. This is where turn & signal servers come to our aid.

Signal is the server that peers connect to for SD sharing. We'll look at it in more detail a little later.

Turn server solves a different problem. Our devices often access the Internet via wifi, and therefore they are located inside the local NAT of your router and do not have a public IP address. But for a turn server this is not a problem. With the help of its "Magic Cookies" it will still reach your device.

So, we created an SD, found out our IP, and even sent our SD as an offer to the signal server. Next, the second peer records the SD we sent, generates its own SD and sends it to the signal server in the form of an answer. The first peer writes the received SD as an answer to itself.

In addition to exchanging media information (discussed above in offer/answer and SDP), nodes must exchange network connection information. This is known as the ICE candidate and details the available methods by which a node can communicate (directly or through a TURN server). ICE candidates are also shared between peers via the signal server.

scheme_1

And now, after all the dancing with the tambourine, we can finally install our RTCPeerConnection.

Why we need SFU ?

Select Forwarding Unit comes to the rescue when the number of peers in one session reaches 7+. Let's look at an example:

scheme_2

We have 7 peers, each of which sends video and audio tracks. In p2p connection format we will get: 6+5+4+3+2+1=21 Peer connection. And the number of tracks will be 4 times greater (from each such connection there is a video and audio track on both sides).
There is an absolutely inefficient use of resources. Therefore, we move from a fully connected topology to a star topology:

scheme_3

Now we have 7 PeerConnections, where for each peer we receive 2 outgoing and 12 incoming tracks. Although Web RTC was originally intended for browsers to communicate directly, it is in conjunction with SFU that we get the opportunity to truly experience the power of this technology.

It should also be noted that there is another connection option - MCU (Multipoint Control Unit). But then our server’s responsibilities will also include packaging all outgoing tracks for each peer into a single MediaStream. However, it will be impossible for the user to interact with these threads.

zoom

Let me give you an example: Zoom and its ability to mute and move tiles from users’ videos. It is precisely due to the fact that each tile is a separate MediaStream that you can interact with it. If we had implemented the MCU, we would not have received many tiles with video, but a single block of video streams, which the MCU carefully provided to us. Thus, by increasing the load on the server several times, we are reducing many opportunities for the user. Yes, the load on the client will then practically disappear, but do these advantages cover the emerging disadvantages? I think not.

Let's start

Now we begin to bring the project to life. First, let's decide on the structure of our sfu.

scheme_4

Our server will consist of two parts: Signal & Coordinator. The first will ensure the exchange of SD & ICE candidates, the second will control incoming and outgoing flows.

Peer

Peer will be an elementary structure and will represent the user

type Peer struct {
    id         string
    connection *webrtc.PeerConnection
    streams    map[string]*webrtc.TrackRemote
    mutex      sync.RWMutex
    socket     *websocket.Conn
}
Enter fullscreen mode Exit fullscreen mode

Here everything is simple so far, the class contains a socket, the connection itself and tracks coming from the user

Now let's describe the behavior of our peer

type PeerInterface interface {
    SetSocket(ws_conn *websocket.Conn)
    AddRemoteTrack(track *webrtc.TrackRemote)
    RemoveRemoteTrack(track *webrtc.TrackRemote)
    SetPeerConnection(conn *webrtc.PeerConnection)
    ReactOnOffer(offer webrtc.SessionDescription)
    ReactOnAnswer(answer_str string)
}
Enter fullscreen mode Exit fullscreen mode

Also, so far everything is standard, several methods for setting the value of the Peer fields. The only thing you should focus on: ReactOnAnswer& ReactOnOffer:

func (peer *Peer) ReactOnOffer(offer_str string) (webrtc.SessionDescription, error) {
    peer.mutex.Lock()
    defer peer.mutex.Unlock()

    offer := webrtc.SessionDescription{
        Type: webrtc.SDPTypeOffer,
        SDP:  offer_str,
    }
    err := peer.connection.SetRemoteDescription(offer)
    if err != nil {
        fmt.Println(err)
        return offer, err
    }
    fmt.Println("Remote Description was set for peer ", peer.id)
    answer, err := peer.connection.CreateAnswer(nil)
    _ = peer.connection.SetLocalDescription(answer)
    fmt.Println("Local Description was set for peer ", peer.id)
    if err != nil {
        return offer, err
    }
    fmt.Println("Answer was created in peer ", peer.id)
    return answer, nil

}
Enter fullscreen mode Exit fullscreen mode

When we receive an Offer from another peer from the signal server, we need to save the incoming SD, for us it is a remote SD, so SetRemoteDescription() will help us in our task. Next, as already described in the theoretical part, we need to send an Answer so that the other peer will also set the RemoteDescription, but before that we also save our own LocalDescription

func (peer *Peer) ReactOnAnswer(answer_str string) error {
    peer.mutex.Lock()
    defer peer.mutex.Unlock()
    answer := webrtc.SessionDescription{
        Type: webrtc.SDPTypeAnswer,
        SDP:  answer_str,
    }
    err := peer.connection.SetRemoteDescription(answer)
    if err != nil {
        fmt.Println(err)
        return err
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

The situation is identical, it’s just that now we are on the other side of the barricades. We sent an Offer, received an Answer, and set the RemoteDescription.

Room

We rise a little higher. The Room structure will be one video and/or audio conference session.

type Room struct {
    id     string
    mutex  sync.RWMutex
    peers  map[string]*Peer
    tracks map[string]*webrtc.TrackLocalStaticRTP
}
Enter fullscreen mode Exit fullscreen mode

It will contain all peers in a given room, and will also save outgoing tracks to itself in the tracks field and transmit them to other peers.
Let's describe the behavior:

type RoomInterface interface {
    JoinRoom(id string)
    AddPeer(peer *Peer)
    RemovePeer(peer_id string)
    AddTrack(track *webrtc.TrackRemote)
    RemoveTrack(track *webrtc.TrackRemote)
    SendAnswer(message webrtc.SessionDescription, peer_id string)
    SendOffer(message webrtc.SessionDescription, peer_id string)
    SendICE(message *webrtc.ICECandidate, peer_id string)
    Signal()
}
Enter fullscreen mode Exit fullscreen mode

There are also several standard functions, let’s go over them:

  1. AddPeer(peer *Peer) - adds a user to the room.peers field
  2. RemovePeer(peer_id string) - removes a user from the room, if there is one
  3. AddTrack(track *webrtc.TrackRemote) - adds a new track and adds it to your room.tracks
  4. RemoveTrack(track *webrtc.TrackRemote) - removes the track accordingly
  5. SendAnswer, SendOffer, SendICE - send the Offer, Answer & Ice candidate, respectively, to all users in the room, except for the peer with id=peer_id. Now let's move on to the fun part: room.Signal()
func (room *Room) Signal() {
    room.mutex.Lock()
    defer room.mutex.Unlock()
    attemptSync := func() (again bool) {
        for _, peer := range room.peers {

            // 1) Check if peer is already closed
            if peer.connection.ConnectionState() == webrtc.PeerConnectionStateClosed {
                fmt.Println("Peer with peer_id", peer.id, "was disconnected")
                room.RemovePeer(peer.id)
                return true
            }
            // 2) 
            existingSenders := map[string]bool{}
            for _, sender := range peer.connection.GetSenders() {
                if sender.Track() == nil {
                    continue
                }
                // 3)
                existingSenders[sender.Track().ID()] = true
                // If we have a RTPSender that doesn't map to a existing track remove and signal
                if _, ok := room.tracks[sender.Track().ID()]; !ok {
                    if err := peer.connection.RemoveTrack(sender); err != nil {
                        fmt.Println("Track", sender.Track().ID(), "was removed")
                        return true
                    }
                }
            }

            // 4) Don't receive videos we are sending, make sure we don't have loopback
            for _, receiver := range peer.connection.GetReceivers() {
                if receiver.Track() == nil {
                    continue
                }

                existingSenders[receiver.Track().ID()] = true
            }
            // 5) Add all track we aren't sending yet to the PeerConnection
            for trackID := range room.tracks {
                if _, ok := existingSenders[trackID]; !ok {
                    if _, err := peer.connection.AddTrack(room.tracks[trackID]); err == nil {
                        fmt.Println("New track are sending for peer", peer.id)
                        return true
                    } else {
                        fmt.Println(err)
                    }
                }
            }
            // 6)
            if peer.connection.PendingLocalDescription() != nil {
                fmt.Println(peer.connection.PendingLocalDescription())
                offer, err := peer.connection.CreateOffer(&webrtc.OfferOptions{
                    OfferAnswerOptions: webrtc.OfferAnswerOptions{},
                    ICERestart:         true,
                })
                if err != nil {
                    fmt.Println("Error in CreateOffer: ", err)
                    return true
                }
                if err = peer.connection.SetLocalDescription(offer); err != nil {
                    fmt.Println("Offer: ", offer)
                    fmt.Println("Cannot set LocalDescription: ", err)
                    return false
                }

                offerString, err := json.Marshal(offer)
                if err != nil {
                    fmt.Println("Marshalling failed: ", err)
                    return true
                }

                if err = peer.socket.WriteJSON(&WsMessage{
                    Event: "offer",
                    Data:  string(offerString),
                }); err != nil {
                    fmt.Println("Cannot write message in WsMessage: ", err)
                    return true
                }
            }

        }
        return
    }
    // 7)
    for syncAttempt := 0; ; syncAttempt++ {
        if syncAttempt == 25 {
            // Release the lock and attempt a sync in 3 seconds. We might be blocking a RemoveTrack or AddTrack
            go func() {
                time.Sleep(time.Second * 3)
                room.Signal()
            }()
            return
        }

        if !attemptSync() {
            fmt.Println("Signalling finished")
            break
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

This function contains most of the logic for Room. Let's take a closer look:

  1. Let's check our current peers. If it is closed, delete it.
  2. We get all Senders() from this peer. Sender is the stream that comes to us from this peer.
  3. We check whether we have “stuck” tracks, that is, tracks that came from a user who has already left the room.
  4. We add tracks coming from the user to the existingSenders variable. This is done so as not to add a track coming from this user at the next stage.
  5. Now the existingSenders variable will contain all the tracks that the user is already receiving or sending. All we have to do is add those that are not yet included in this feast.
  6. Here we check the PendingLocalDescription. This needs some clarification. In each PeerConnection we have two LocalDescription statuses: Current & Pending. The first is updated when an Offer is sent and an Answer is received. The second is updated when the connection parameters change. But it will take effect (i.e. Current will be equal to Pending ) only when Offer/Answer is exchanged again. Here we check for nil value. If PendingLocalDescription == nil, then no changes have occurred and there is no need to update the PeerConnection, otherwise we send an Offer to this peer. In the context of Web RTC, this is called renogotiation, you can read about it here.
  7. When updating client states, we may encounter various errors. Example: adding new tracks, this does not happen instantly and can cause conflict situations, therefore a restart mechanism room.Signal() is implemented with a delay of 3 seconds

Coordinator

Now we move on to the main structure that controls the behavior of all rooms.

type Coordinator struct {
sessionins map[string]*Room
}
Enter fullscreen mode Exit fullscreen mode

As usual, we will describe its behavior initially with the interface

type CoordinatorInterface interface {
CreateRoom(id string)
RemoveRoom(id string)
AddUserToRoom(self_id string, room_id string, socket *websocket.Conn)
RemoveUserFromRoom(self_id string, room_id string, socket *websocket.Conn)
ShowSessions()
ObtainEvent(message WsMessage, socket *websocket.Conn)
}
Enter fullscreen mode Exit fullscreen mode

Let's go through the methods:

  1. CreateRoom() & RemoveRoom() - creates and deletes a room respectively
  2. ShowSessions() - displays all currently active rooms
  3. RemoveUserFromRoom() - removes a user from a room
  4. AddUserToRoom() - adds a user and configures PeerConnection
  5. ObtainEvent() is a linking method with our Signal server. When initializing the server, we will create a Coordinator structure and process all necessary events with this method

Let's look at the AddUserToRoom() code:

func (coordinator *Coordinator) AddUserToRoom(self_id string, room_id string, socket *websocket.Conn) {
    // 1)
    if _, ok := coordinator.sessioins[room_id]; !ok {
        fmt.Println("New Room was created: ", room_id)
        coordinator.CreateRoom(room_id)
    }
    if room, ok := coordinator.sessioins[room_id]; ok {
        // 2) Add Peer to Room
        room.AddPeer(newPeer(self_id))
        fmt.Println("Peer ", self_id, "was added to room ", room_id)
        if peer, ok := room.peers[self_id]; ok {
            // 3) Set socket connection to Peer
            peer.SetSocket(socket)

            // 4) Create Peer Connection
            conn, err := webrtc.NewPeerConnection(webrtc.Configuration{})
            if err != nil {
                fmt.Println("Failed to establish peer connection")
            }

            peer.SetPeerConnection(conn)
            fmt.Println("Peer connection was established")
            // 5) Accept one audio and one video track incoming
            for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
                if _, err := peer.connection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
                    Direction: webrtc.RTPTransceiverDirectionRecvonly,
                }); err != nil {
                    log.Print(err)
                    return
                }
            }

            // 6) If PeerConnection is closed remove it from global list
            peer.connection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
                switch p {
                case webrtc.PeerConnectionStateFailed:
                    if err := peer.connection.Close(); err != nil {
                        log.Print(err)
                    }
                case webrtc.PeerConnectionStateClosed:
                    room.Signal()
                default:
                }
            })

            // 7) When peer connection is getting the ICE -> send ICE to client
            peer.connection.OnICECandidate(func(i *webrtc.ICECandidate) {
                if i == nil {
                    fmt.Println("ICEGatheringState: connected")
                    return
                }
                fmt.Println("Ice: ", i)
                room.SendICE(i, self_id)
            })
            // 8) 
            peer.connection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
                fmt.Println("Track added from peer: ", self_id)
                defer room.Signal()
                // Create a track to fan out our incoming video to all peers
                trackLocal := room.AddTrack(t)
                defer room.RemoveTrack(trackLocal)
                defer fmt.Println("Track", trackLocal, "was removed")
                buf := make([]byte, 1500)
                for {
                    i, _, err := t.Read(buf)
                    if err != nil {
                        return
                    }

                    if _, err = trackLocal.Write(buf[:i]); err != nil {
                        return
                    }
                }
            })
        }

    }
}
Enter fullscreen mode Exit fullscreen mode

Let's go through the points:

  1. I check if the room the user wants to connect to exists. If there is no such thing, we create it.
  2. Add a Peer to the room.
  3. Set up the socket and RTCPeerConnection in Peer
  4. We accept tracks that come from a new user.
  5. Add a room alarm if at some point the user disconnects
  6. Here we have implemented the logic of sending ICE candidates to each other. If we receive nil as our ICE candidate, then the ICE candidate forwarding is complete. Otherwise, we send the newly created ICE candidate to another peer.
  7. We add a track and signal the room when a new track appears.

Trickle ICE
In Pion you can find an implementation of the so-called Trickle ICE. The essence of this approach is that the client does not wait for the Offer/Answer to complete. And in parallel, it sends/receives ICE candidates as quickly as possible. This is what we called in our Peer.OnICECandidate.
Now it's time for ObtainEvent:

func (coordinator *Coordinator) ObtainEvent(message WsMessage, socket *websocket.Conn) {
    wsMessage := message
    switch wsMessage.Event {
    case "joinRoom":
        go func() {
            m, ok := message.Data.(map[string]any)
            if ok {
                self_id := m["self_id"].(string)
                room_id := m["room_id"].(string)
                coordinator.AddUserToRoom(self_id, room_id, socket)
            }
        }()
    case "leaveRoom":
        go func() {
            m, ok := message.Data.(map[string]any)
            if ok {
                self_id := m["self_id"].(string)
                room_id := m["room_id"].(string)
                coordinator.RemoveUserFromRoom(self_id, room_id)
            }
        }()
    case "offer":
        go func() {
            m, ok := message.Data.(map[string]any)
            if ok {
                self_id, _ := m["self_id"].(string)
                room_id, _ := m["room_id"].(string)
                offer2 := m["offer"].(map[string]any)
                if room, ok := coordinator.sessioins[room_id]; ok {
                    if peer, ok := room.peers[self_id]; ok {
                        answer, err2 := peer.ReactOnOffer(offer2["sdp"].(string))
                        if err2 != nil {
                            fmt.Println(err2)
                            return
                        }
                        room.SendAnswer(answer, self_id)
                    }
                }
            }
        }()
    case "answer":
        go func() {
            m, ok := message.Data.(map[string]any)
            if ok {
                self_id, _ := m["self_id"].(string)
                room_id, _ := m["room_id"].(string)
                offer2 := m["answer"].(map[string]any)
                if room, ok := coordinator.sessioins[room_id]; ok {
                    if peer, ok := room.peers[self_id]; ok {
                        err := peer.ReactOnAnswer(offer2["sdp"].(string))
                        if err != nil {
                            fmt.Println(err)
                            return
                        }
                    }

                }
            }
        }()
    case "ice-candidate":
        go func() {
            m, ok := message.Data.(map[string]any)
            if ok {
                self_id, _ := m["self_id"].(string)
                room_id, _ := m["room_id"].(string)
                candidate := m["candidate"].(map[string]any)
                i_candidate := candidate["candidate"].(string)
                sdp_mid := candidate["sdpMid"].(string)
                sdp_m_line_index := uint16(candidate["sdpMLineIndex"].(float64))
                var username_fragment string
                if candidate["usernameFragment"] != nil {
                    username_fragment = candidate["usernameFragment"].(string)
                } else {
                    username_fragment = ""
                }
                init := webrtc.ICECandidateInit{
                    Candidate:        i_candidate,
                    SDPMid:           &sdp_mid,
                    SDPMLineIndex:    &sdp_m_line_index,
                    UsernameFragment: &username_fragment,
                }
                if room, ok := coordinator.sessioins[room_id]; ok {
                    if peer, ok := room.peers[self_id]; ok {
                        if err := peer.connection.AddICECandidate(init); err != nil {
                            log.Println(err)
                            return
                        }
                        fmt.Println("ICE-CANDIDATE added for peer", peer.id)
                        fmt.Println(peer.connection.ICEConnectionState())
                        fmt.Println(peer.connection.ICEGatheringState())
                    }
                }
            } else {
                fmt.Println(m)
                fmt.Println("nach")
            }
        }()
    default:
        fmt.Println("DEFAULT")
        fmt.Println(wsMessage)

    }

    return
}
Enter fullscreen mode Exit fullscreen mode

Here we validate the value of the Event field. And call the appropriate method. As input we receive a WsMessage Message, which is a common method for various messages.

Signal

Now the final element of our server, a socket that will receive messages and send them to Coordinator.ObtainEvent()

// websockets listener
func (ws *WsServer) wsInit(w http.ResponseWriter, r *http.Request) {

    conn, err := upgrader.Upgrade(w, r, nil)

    defer conn.Close()

    fmt.Printf("Client connected")

    if err != nil {
        fmt.Printf(" with error %s", err)
        return
    }

    fmt.Println(" successfully")

    message := types.WsMessage{}

    for {
        messageType, bmessage, err := conn.ReadMessage()
        fmt.Println(bmessage)
        if err != nil {
            fmt.Println(err)
            return
        }
        if messageType == websocket.CloseMessage {
            break
        }

        err = json.Unmarshal(bmessage, &message)
        if err != nil {
            fmt.Println("DROP")
            fmt.Println(message.Data)
            fmt.Println(err)
            return
        }
        ws.coordinator.ObtainEvent(message, conn)
    }
}
Enter fullscreen mode Exit fullscreen mode

Here we convert incoming messages to WsMessage and give them to our coordinator

Conclusion

As a result, we have a starting version of SFU. As stated in Web RTC for Curious:

Building a simple SFU can be done in a weekend. Building a good SFU that can handle all types of clients is never ending. Tuning the Congestion Control, Error Correction and Performance is a never ending task.

Therefore, this is only the first version of a simple SFU server. There are still many different implementations ahead that can improve this server and improve the quality of video/audio conferences. If you like this article, I will not delay the release of the next ones.
The source code can be found in this repository.
I would also like to hear from you your wishes and tips for the development of SFU. Since this was implemented from scratch both in knowledge of WebRTC and Golang.

After all, I want to give thanks to Erased-sh for helping me with understanding WebRTC

References

https://developer.mozilla.org/ru/docs/Web/API/WebRTC_API/Protocols
https://webrtcforthecurious.com

💖 💪 🙅 🚩
vthesaint
the_Aristo

Posted on February 9, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related