the_Aristo
Posted on February 9, 2024
WebRTC is a browser technology designed to transfer streaming data between browsers or applications using point-to-point transmission technology.
Web RTC has been supported in most browsers for a long time, so bypassing the technology is quite stupid. That’s what I thought, so I decided to write an sfu server in golang as a pet project.
About Web RTC
Here I will briefly go over the basics of how Web RTC works; for those who are interested in going a little deeper, I’ll leave the link here.
In order for two peers to be able to provide themselves with RTCPeerConnection, the SDP (Session Description Protocol) protocol is used. The protocol has a key-value structure and is essentially a description of a single peer (the name speaks for itself).
Example SessionDescription
v=0
o=- 0 0 IN IP4 127.0.0.1
s=-
c=IN IP4 127.0.0.1
t=0 0
m=audio 4000 RTP/AVP 111
a=rtpmap:111 OPUS/48000/2
m=video 4002 RTP/AVP 96
a=rtpmap:96 VP8/90000
After a peer has generated a SessionDescription for us (hereinafter simply SD), the peer sends it to another peer in the form of an offer.
Here the question immediately arises: “Where should I send it?” And the question is absolutely correct, a peer by itself cannot know the IP address of another peer. This is where turn & signal servers come to our aid.
Signal is the server that peers connect to for SD sharing. We'll look at it in more detail a little later.
Turn server solves a different problem. Our devices often access the Internet via wifi, and therefore they are located inside the local NAT of your router and do not have a public IP address. But for a turn server this is not a problem. With the help of its "Magic Cookies" it will still reach your device.
So, we created an SD, found out our IP, and even sent our SD as an offer to the signal server. Next, the second peer records the SD we sent, generates its own SD and sends it to the signal server in the form of an answer. The first peer writes the received SD as an answer to itself.
In addition to exchanging media information (discussed above in offer/answer and SDP), nodes must exchange network connection information. This is known as the ICE candidate and details the available methods by which a node can communicate (directly or through a TURN server). ICE candidates are also shared between peers via the signal server.
And now, after all the dancing with the tambourine, we can finally install our RTCPeerConnection.
Why we need SFU ?
Select Forwarding Unit comes to the rescue when the number of peers in one session reaches 7+. Let's look at an example:
We have 7 peers, each of which sends video and audio tracks. In p2p connection format we will get: 6+5+4+3+2+1=21 Peer connection. And the number of tracks will be 4 times greater (from each such connection there is a video and audio track on both sides).
There is an absolutely inefficient use of resources. Therefore, we move from a fully connected topology to a star topology:
Now we have 7 PeerConnections, where for each peer we receive 2 outgoing and 12 incoming tracks. Although Web RTC was originally intended for browsers to communicate directly, it is in conjunction with SFU that we get the opportunity to truly experience the power of this technology.
It should also be noted that there is another connection option - MCU (Multipoint Control Unit). But then our server’s responsibilities will also include packaging all outgoing tracks for each peer into a single MediaStream. However, it will be impossible for the user to interact with these threads.
Let me give you an example: Zoom and its ability to mute and move tiles from users’ videos. It is precisely due to the fact that each tile is a separate MediaStream that you can interact with it. If we had implemented the MCU, we would not have received many tiles with video, but a single block of video streams, which the MCU carefully provided to us. Thus, by increasing the load on the server several times, we are reducing many opportunities for the user. Yes, the load on the client will then practically disappear, but do these advantages cover the emerging disadvantages? I think not.
Let's start
Now we begin to bring the project to life. First, let's decide on the structure of our sfu.
Our server will consist of two parts: Signal & Coordinator. The first will ensure the exchange of SD & ICE candidates, the second will control incoming and outgoing flows.
Peer
Peer will be an elementary structure and will represent the user
type Peer struct {
id string
connection *webrtc.PeerConnection
streams map[string]*webrtc.TrackRemote
mutex sync.RWMutex
socket *websocket.Conn
}
Here everything is simple so far, the class contains a socket, the connection itself and tracks coming from the user
Now let's describe the behavior of our peer
type PeerInterface interface {
SetSocket(ws_conn *websocket.Conn)
AddRemoteTrack(track *webrtc.TrackRemote)
RemoveRemoteTrack(track *webrtc.TrackRemote)
SetPeerConnection(conn *webrtc.PeerConnection)
ReactOnOffer(offer webrtc.SessionDescription)
ReactOnAnswer(answer_str string)
}
Also, so far everything is standard, several methods for setting the value of the Peer fields. The only thing you should focus on: ReactOnAnswer& ReactOnOffer:
func (peer *Peer) ReactOnOffer(offer_str string) (webrtc.SessionDescription, error) {
peer.mutex.Lock()
defer peer.mutex.Unlock()
offer := webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: offer_str,
}
err := peer.connection.SetRemoteDescription(offer)
if err != nil {
fmt.Println(err)
return offer, err
}
fmt.Println("Remote Description was set for peer ", peer.id)
answer, err := peer.connection.CreateAnswer(nil)
_ = peer.connection.SetLocalDescription(answer)
fmt.Println("Local Description was set for peer ", peer.id)
if err != nil {
return offer, err
}
fmt.Println("Answer was created in peer ", peer.id)
return answer, nil
}
When we receive an Offer from another peer from the signal server, we need to save the incoming SD, for us it is a remote SD, so SetRemoteDescription() will help us in our task. Next, as already described in the theoretical part, we need to send an Answer so that the other peer will also set the RemoteDescription, but before that we also save our own LocalDescription
func (peer *Peer) ReactOnAnswer(answer_str string) error {
peer.mutex.Lock()
defer peer.mutex.Unlock()
answer := webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: answer_str,
}
err := peer.connection.SetRemoteDescription(answer)
if err != nil {
fmt.Println(err)
return err
}
return nil
}
The situation is identical, it’s just that now we are on the other side of the barricades. We sent an Offer, received an Answer, and set the RemoteDescription.
Room
We rise a little higher. The Room structure will be one video and/or audio conference session.
type Room struct {
id string
mutex sync.RWMutex
peers map[string]*Peer
tracks map[string]*webrtc.TrackLocalStaticRTP
}
It will contain all peers in a given room, and will also save outgoing tracks to itself in the tracks field and transmit them to other peers.
Let's describe the behavior:
type RoomInterface interface {
JoinRoom(id string)
AddPeer(peer *Peer)
RemovePeer(peer_id string)
AddTrack(track *webrtc.TrackRemote)
RemoveTrack(track *webrtc.TrackRemote)
SendAnswer(message webrtc.SessionDescription, peer_id string)
SendOffer(message webrtc.SessionDescription, peer_id string)
SendICE(message *webrtc.ICECandidate, peer_id string)
Signal()
}
There are also several standard functions, let’s go over them:
- AddPeer(peer *Peer) - adds a user to the room.peers field
- RemovePeer(peer_id string) - removes a user from the room, if there is one
- AddTrack(track *webrtc.TrackRemote) - adds a new track and adds it to your room.tracks
- RemoveTrack(track *webrtc.TrackRemote) - removes the track accordingly
- SendAnswer, SendOffer, SendICE - send the Offer, Answer & Ice candidate, respectively, to all users in the room, except for the peer with id=peer_id. Now let's move on to the fun part: room.Signal()
func (room *Room) Signal() {
room.mutex.Lock()
defer room.mutex.Unlock()
attemptSync := func() (again bool) {
for _, peer := range room.peers {
// 1) Check if peer is already closed
if peer.connection.ConnectionState() == webrtc.PeerConnectionStateClosed {
fmt.Println("Peer with peer_id", peer.id, "was disconnected")
room.RemovePeer(peer.id)
return true
}
// 2)
existingSenders := map[string]bool{}
for _, sender := range peer.connection.GetSenders() {
if sender.Track() == nil {
continue
}
// 3)
existingSenders[sender.Track().ID()] = true
// If we have a RTPSender that doesn't map to a existing track remove and signal
if _, ok := room.tracks[sender.Track().ID()]; !ok {
if err := peer.connection.RemoveTrack(sender); err != nil {
fmt.Println("Track", sender.Track().ID(), "was removed")
return true
}
}
}
// 4) Don't receive videos we are sending, make sure we don't have loopback
for _, receiver := range peer.connection.GetReceivers() {
if receiver.Track() == nil {
continue
}
existingSenders[receiver.Track().ID()] = true
}
// 5) Add all track we aren't sending yet to the PeerConnection
for trackID := range room.tracks {
if _, ok := existingSenders[trackID]; !ok {
if _, err := peer.connection.AddTrack(room.tracks[trackID]); err == nil {
fmt.Println("New track are sending for peer", peer.id)
return true
} else {
fmt.Println(err)
}
}
}
// 6)
if peer.connection.PendingLocalDescription() != nil {
fmt.Println(peer.connection.PendingLocalDescription())
offer, err := peer.connection.CreateOffer(&webrtc.OfferOptions{
OfferAnswerOptions: webrtc.OfferAnswerOptions{},
ICERestart: true,
})
if err != nil {
fmt.Println("Error in CreateOffer: ", err)
return true
}
if err = peer.connection.SetLocalDescription(offer); err != nil {
fmt.Println("Offer: ", offer)
fmt.Println("Cannot set LocalDescription: ", err)
return false
}
offerString, err := json.Marshal(offer)
if err != nil {
fmt.Println("Marshalling failed: ", err)
return true
}
if err = peer.socket.WriteJSON(&WsMessage{
Event: "offer",
Data: string(offerString),
}); err != nil {
fmt.Println("Cannot write message in WsMessage: ", err)
return true
}
}
}
return
}
// 7)
for syncAttempt := 0; ; syncAttempt++ {
if syncAttempt == 25 {
// Release the lock and attempt a sync in 3 seconds. We might be blocking a RemoveTrack or AddTrack
go func() {
time.Sleep(time.Second * 3)
room.Signal()
}()
return
}
if !attemptSync() {
fmt.Println("Signalling finished")
break
}
}
}
This function contains most of the logic for Room. Let's take a closer look:
- Let's check our current peers. If it is closed, delete it.
- We get all Senders() from this peer. Sender is the stream that comes to us from this peer.
- We check whether we have “stuck” tracks, that is, tracks that came from a user who has already left the room.
- We add tracks coming from the user to the existingSenders variable. This is done so as not to add a track coming from this user at the next stage.
- Now the existingSenders variable will contain all the tracks that the user is already receiving or sending. All we have to do is add those that are not yet included in this feast.
- Here we check the PendingLocalDescription. This needs some clarification. In each PeerConnection we have two LocalDescription statuses: Current & Pending. The first is updated when an Offer is sent and an Answer is received. The second is updated when the connection parameters change. But it will take effect (i.e. Current will be equal to Pending ) only when Offer/Answer is exchanged again. Here we check for nil value. If PendingLocalDescription == nil, then no changes have occurred and there is no need to update the PeerConnection, otherwise we send an Offer to this peer. In the context of Web RTC, this is called renogotiation, you can read about it here.
- When updating client states, we may encounter various errors. Example: adding new tracks, this does not happen instantly and can cause conflict situations, therefore a restart mechanism room.Signal() is implemented with a delay of 3 seconds
Coordinator
Now we move on to the main structure that controls the behavior of all rooms.
type Coordinator struct {
sessionins map[string]*Room
}
As usual, we will describe its behavior initially with the interface
type CoordinatorInterface interface {
CreateRoom(id string)
RemoveRoom(id string)
AddUserToRoom(self_id string, room_id string, socket *websocket.Conn)
RemoveUserFromRoom(self_id string, room_id string, socket *websocket.Conn)
ShowSessions()
ObtainEvent(message WsMessage, socket *websocket.Conn)
}
Let's go through the methods:
- CreateRoom() & RemoveRoom() - creates and deletes a room respectively
- ShowSessions() - displays all currently active rooms
- RemoveUserFromRoom() - removes a user from a room
- AddUserToRoom() - adds a user and configures PeerConnection
- ObtainEvent() is a linking method with our Signal server. When initializing the server, we will create a Coordinator structure and process all necessary events with this method
Let's look at the AddUserToRoom() code:
func (coordinator *Coordinator) AddUserToRoom(self_id string, room_id string, socket *websocket.Conn) {
// 1)
if _, ok := coordinator.sessioins[room_id]; !ok {
fmt.Println("New Room was created: ", room_id)
coordinator.CreateRoom(room_id)
}
if room, ok := coordinator.sessioins[room_id]; ok {
// 2) Add Peer to Room
room.AddPeer(newPeer(self_id))
fmt.Println("Peer ", self_id, "was added to room ", room_id)
if peer, ok := room.peers[self_id]; ok {
// 3) Set socket connection to Peer
peer.SetSocket(socket)
// 4) Create Peer Connection
conn, err := webrtc.NewPeerConnection(webrtc.Configuration{})
if err != nil {
fmt.Println("Failed to establish peer connection")
}
peer.SetPeerConnection(conn)
fmt.Println("Peer connection was established")
// 5) Accept one audio and one video track incoming
for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
if _, err := peer.connection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
}); err != nil {
log.Print(err)
return
}
}
// 6) If PeerConnection is closed remove it from global list
peer.connection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
switch p {
case webrtc.PeerConnectionStateFailed:
if err := peer.connection.Close(); err != nil {
log.Print(err)
}
case webrtc.PeerConnectionStateClosed:
room.Signal()
default:
}
})
// 7) When peer connection is getting the ICE -> send ICE to client
peer.connection.OnICECandidate(func(i *webrtc.ICECandidate) {
if i == nil {
fmt.Println("ICEGatheringState: connected")
return
}
fmt.Println("Ice: ", i)
room.SendICE(i, self_id)
})
// 8)
peer.connection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
fmt.Println("Track added from peer: ", self_id)
defer room.Signal()
// Create a track to fan out our incoming video to all peers
trackLocal := room.AddTrack(t)
defer room.RemoveTrack(trackLocal)
defer fmt.Println("Track", trackLocal, "was removed")
buf := make([]byte, 1500)
for {
i, _, err := t.Read(buf)
if err != nil {
return
}
if _, err = trackLocal.Write(buf[:i]); err != nil {
return
}
}
})
}
}
}
Let's go through the points:
- I check if the room the user wants to connect to exists. If there is no such thing, we create it.
- Add a Peer to the room.
- Set up the socket and RTCPeerConnection in Peer
- We accept tracks that come from a new user.
- Add a room alarm if at some point the user disconnects
- Here we have implemented the logic of sending ICE candidates to each other. If we receive nil as our ICE candidate, then the ICE candidate forwarding is complete. Otherwise, we send the newly created ICE candidate to another peer.
- We add a track and signal the room when a new track appears.
Trickle ICE
In Pion you can find an implementation of the so-called Trickle ICE. The essence of this approach is that the client does not wait for the Offer/Answer to complete. And in parallel, it sends/receives ICE candidates as quickly as possible. This is what we called in our Peer.OnICECandidate.
Now it's time for ObtainEvent:
func (coordinator *Coordinator) ObtainEvent(message WsMessage, socket *websocket.Conn) {
wsMessage := message
switch wsMessage.Event {
case "joinRoom":
go func() {
m, ok := message.Data.(map[string]any)
if ok {
self_id := m["self_id"].(string)
room_id := m["room_id"].(string)
coordinator.AddUserToRoom(self_id, room_id, socket)
}
}()
case "leaveRoom":
go func() {
m, ok := message.Data.(map[string]any)
if ok {
self_id := m["self_id"].(string)
room_id := m["room_id"].(string)
coordinator.RemoveUserFromRoom(self_id, room_id)
}
}()
case "offer":
go func() {
m, ok := message.Data.(map[string]any)
if ok {
self_id, _ := m["self_id"].(string)
room_id, _ := m["room_id"].(string)
offer2 := m["offer"].(map[string]any)
if room, ok := coordinator.sessioins[room_id]; ok {
if peer, ok := room.peers[self_id]; ok {
answer, err2 := peer.ReactOnOffer(offer2["sdp"].(string))
if err2 != nil {
fmt.Println(err2)
return
}
room.SendAnswer(answer, self_id)
}
}
}
}()
case "answer":
go func() {
m, ok := message.Data.(map[string]any)
if ok {
self_id, _ := m["self_id"].(string)
room_id, _ := m["room_id"].(string)
offer2 := m["answer"].(map[string]any)
if room, ok := coordinator.sessioins[room_id]; ok {
if peer, ok := room.peers[self_id]; ok {
err := peer.ReactOnAnswer(offer2["sdp"].(string))
if err != nil {
fmt.Println(err)
return
}
}
}
}
}()
case "ice-candidate":
go func() {
m, ok := message.Data.(map[string]any)
if ok {
self_id, _ := m["self_id"].(string)
room_id, _ := m["room_id"].(string)
candidate := m["candidate"].(map[string]any)
i_candidate := candidate["candidate"].(string)
sdp_mid := candidate["sdpMid"].(string)
sdp_m_line_index := uint16(candidate["sdpMLineIndex"].(float64))
var username_fragment string
if candidate["usernameFragment"] != nil {
username_fragment = candidate["usernameFragment"].(string)
} else {
username_fragment = ""
}
init := webrtc.ICECandidateInit{
Candidate: i_candidate,
SDPMid: &sdp_mid,
SDPMLineIndex: &sdp_m_line_index,
UsernameFragment: &username_fragment,
}
if room, ok := coordinator.sessioins[room_id]; ok {
if peer, ok := room.peers[self_id]; ok {
if err := peer.connection.AddICECandidate(init); err != nil {
log.Println(err)
return
}
fmt.Println("ICE-CANDIDATE added for peer", peer.id)
fmt.Println(peer.connection.ICEConnectionState())
fmt.Println(peer.connection.ICEGatheringState())
}
}
} else {
fmt.Println(m)
fmt.Println("nach")
}
}()
default:
fmt.Println("DEFAULT")
fmt.Println(wsMessage)
}
return
}
Here we validate the value of the Event field. And call the appropriate method. As input we receive a WsMessage Message, which is a common method for various messages.
Signal
Now the final element of our server, a socket that will receive messages and send them to Coordinator.ObtainEvent()
// websockets listener
func (ws *WsServer) wsInit(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
defer conn.Close()
fmt.Printf("Client connected")
if err != nil {
fmt.Printf(" with error %s", err)
return
}
fmt.Println(" successfully")
message := types.WsMessage{}
for {
messageType, bmessage, err := conn.ReadMessage()
fmt.Println(bmessage)
if err != nil {
fmt.Println(err)
return
}
if messageType == websocket.CloseMessage {
break
}
err = json.Unmarshal(bmessage, &message)
if err != nil {
fmt.Println("DROP")
fmt.Println(message.Data)
fmt.Println(err)
return
}
ws.coordinator.ObtainEvent(message, conn)
}
}
Here we convert incoming messages to WsMessage and give them to our coordinator
Conclusion
As a result, we have a starting version of SFU. As stated in Web RTC for Curious:
Building a simple SFU can be done in a weekend. Building a good SFU that can handle all types of clients is never ending. Tuning the Congestion Control, Error Correction and Performance is a never ending task.
Therefore, this is only the first version of a simple SFU server. There are still many different implementations ahead that can improve this server and improve the quality of video/audio conferences. If you like this article, I will not delay the release of the next ones.
The source code can be found in this repository.
I would also like to hear from you your wishes and tips for the development of SFU. Since this was implemented from scratch both in knowledge of WebRTC and Golang.
After all, I want to give thanks to Erased-sh for helping me with understanding WebRTC
References
https://developer.mozilla.org/ru/docs/Web/API/WebRTC_API/Protocols
https://webrtcforthecurious.com
Posted on February 9, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.