Building Real-Time Communication: Harnessing WebRTC with FastAPI Part 3- Wrapping Every thing up
Wassaf Shahzasd
Posted on April 7, 2024
Welcome to the last part of my series where we will be building a google-meet-clone using FastAPI and WebRTC. If you haven't read the previous article you can read it here.
π On the previous tutorial.
Our project directory looked something like this.
π
|-- π main.py
|-- π requirements.py
|-- π static
|-- |-- home.css
|-- |-- home.js
|-- π templates
|-- |-- main.html
|-- |-- home.html
|-- π .env
Create a new file on the same level as main.py
called manager.py
. Here we will create a WebSocket manager which will allow users to join, leave and message in a chat room.
from fastapi.websockets import WebSocket
class SignalManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
@property
def is_empty(self):
return len(self.active_connections) == 0
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_json(message)
async def broadcast(self, message: dict, websocket: WebSocket):
for connection in self.active_connections:
if connection != websocket:
await connection.send_json(message)
The Signal Manager class is responsible for accepting web socket connections and sending mass and/or personal messages. The active_connections property is responsible for managing all active connections. The broadcast function sends a message to all other web sockets in the room except the web socket which sent the message.
π¨βπΌ Chat Room Manager Class
Now create a RoomManager class since there can be multiple rooms with multiple connections. just below the SignalManager class.
class MeetingManager:
def __init__(self) -> None:
self.rooms: dict[str, SignalManager] = {}
async def join(self, id: str, websocket: WebSocket):
if id in self.rooms:
await self.rooms[id].connect(websocket)
else:
self.rooms[id] = SignalManager()
await self.rooms[id].connect(websocket)
await self.rooms[id].broadcast({"type":"USER_JOIN"}, websocket)
def leave(self, id: str, websocket: WebSocket):
self.rooms[id].disconnect(websocket)
if self.rooms[id].is_empty:
del self.rooms[id]
This class is responsible to managing the chat rooms and for allowing users to join and disconnect a room.
One point of interest is scalable WebSocket . To implement scalable WebSocket's, We need to implement WebSocket with redis.
π Plugging every thing together.
Open up main.py
and import the Meeting Manager class from manager.py
and create a manager instance.
from manager import MeetingManager
manager = MeetingManager()
Lets create a function which allows a WebSocket to join a room.
@app.websocket("/ws/{client_id}")
async def connet_websocket(websocket: WebSocket, client_id: str):
await meeting_manager.join(client_id, websocket)
try:
while True:
data = await websocket.receive_json()
await meeting_manager.rooms[client_id].broadcast(data, websocket)
except WebSocketDisconnect:
meeting_manager.leave(client_id, websocket)
The above code accepts a WebSocket connection and then awaits until the WebSocket disconnects. Until then it waits to receive messages and broadcasts it to the whole room.
Like I explained previously, we will be using WebSocket's as a signaling server to send both SDP offers and ICE candidates.
Lets add the URL for the video page.
@app.get("/room/{roomName}")
def get_video(request: Request, roomName:str):
return templates.TemplateResponse(request=request, name="index.html")
This is the URL which the home.js
redirects to after your enter the meeting room name. Don't worry about the index.html, we will create that later.
πBusiness on the front, Party on the back
Now lets create the index.html
in the templates folder. This html file is responsible for rendering the video tags.
{% extends "main.html" %}
{% block script %}
<link rel='stylesheet' type='text/css' media='screen' href="{{ url_for('static', path='/index.css') }}">
<script src="{{ url_for('static', path='/index.js') }}""></script>
{% endblock %}
{% block content %}
<div id="videos">
<video class="video-player" id="user-1" autoplay></video>
<video class="video-player" id="user-2" autoplay></video>
</div>
{% endblock %}
Right now we are only working with the assumption that only two users will join the call, later on you can add on to this and create it for multiple video calls.
π Moving on to the Hard stuff.
Create a index.js
file inside the static folder. Our project directory becomes the following.
π
|-- π main.py
|-- π requirements.py
|-- π static
|-- |-- home.css
|-- |-- home.js
|-- |-- index.js
|-- π templates
|-- |-- main.html
|-- |-- home.html
|-- |-- index.html
|-- π .env
Initializing some variables which will be used later on.
let localStream;
let remoteStream;
let peerConnection;
let socket;
let makingOffer = false;
let polite = false
Now lets create a connect function which connects the WebSocket to the chat room. So that we can use our FastAPI Server as the singling server.
let connect = async (callback) => {
let roomName = window.location.pathname.split("/")[1];
socket = new WebSocket(`ws://localhost:8000/ws/${roomName}"`);
socket.onopen = async (_) => {
await callback()
};
socket.onmessage = handleMessage;
};
This function takes in a callback which is called once the socket is successfully connected. We also assign a helper function called handleMessage
to the onmessage event. Don't worry we will define the function later on.
Now lets create a init function which is first run when the index.js file is loaded.
let init = async () => {
localStream = await navigator.mediaDevices.getUserMedia({
video: true,
audio: false,
.
});
document.getElementById("user-1").srcObject = localStream;
await connect(createStreams);
}
This function does two things.
- It gets the Users Media object and assigns it to the video tag, defined in the index.html.
- It also calls the connect function, with the
createStreams
function as a callback.
πΌ Creating Video Streams
Lets create a function which creates peerConnection object and remote media streams.The full function looks like this
let createStreams = async () => {
peerConnection = new RTCPeerConnection(config);
remoteStream = new MediaStream();
localStream.getTracks().forEach((track) => {
peerConnection.addTrack(track, localStream);
});
// This function is called each time a peer connects.
peerConnection.ontrack = (event) => {
console.log("adding track");
event.streams[0]
.getTracks()
.forEach((track) => remoteStream.addTrack(track));
};
peerConnection.onicecandidate = async (event) => {
if (event.candidate) {
socket.send(
JSON.stringify({ type: "candidate", candidate: event.candidate })
);
}
};
peerConnection.onnegotiationneeded = async () => {
try {
makingOffer = true;
await peerConnection.setLocalDescription();
// signaler.send({ description: pc.localDescription });
socket.send(
JSON.stringify({
type: "OFFER",
message: peerConnection.localDescription,
})
);
} catch (err) {
console.error(err);
} finally {
makingOffer = false;
}
};
document.getElementById("user-2").srcObject = remoteStream;
};
This function first creates a peerConnection Object from the provided config and initializes a empty remoteStream.
The Config object specifies various options but the one we care about are the stun servers which allow us to establish a peer-to-peer connection.
Our Config Object
looks something like this
const config = {
iceServers: [
{
urls: [
"stun:stun1.l.google.com:19302",
"stun:stun1.l.google.com:19302",
"stun:stun2.l.google.com:19302",
],
},
],
};
The next thing we do is get out local tracks and add them to the peerConnection so its available so that they are available to other.(The full function is available just above, this part is just to make it easier to follow)
localStream.getTracks().forEach((track) => {
peerConnection.addTrack(track, localStream);
});
I the next part of the function, we attach a callback to the ontrack
event which is triggered every time a remote peer connects.
peerConnection.ontrack = (event) => {
console.log("adding track");
event.streams[0]
.getTracks()
.forEach((track) => remoteStream.addTrack(track));
};
Now we create an event handler to trickle ice candidates to the connected peers and we use the socket object to send the ice candidates.
peerConnection.onicecandidate = async (event) => {
if (event.candidate) {
socket.send(
JSON.stringify({ type: "candidate", candidate: event.candidate })
);
}
};
The next part peerConnection.onnegotiationneeded
will be discussed next, lets just skip it for now. Finally we attack the remote stream the user-2
video object.
π€ The Perfect Negotiation Pattern
For exchanging the SDP offers and answers we will be implementing the perfect negotiation pattern as described in the following article. The main gist of the article is that we will have two types of peers, A polite peer
which in case of Offer collision ignores its offers and accepts the other senders offer. An impolite peer
which in case keeps sending offers and in case of collision rudely ignores the others offer.
The way you decide who is polite and not depends on you, In this tutorial if a user is already present in the call he will always be polite otherwise impolite.
Lets go back to the createStreams
functions and take a look on the onnegotiationneeded
event handler. This event in triggered in the following casses
- When your call
addTrack
orremoveTrack
- When track track constraints are changed
- When you call setLocalDescription()
- When you explicitly requests renegotiation by calling createOffer() or createAnswer()
peerConnection.onnegotiationneeded = async () => {
try {
makingOffer = true;
await peerConnection.setLocalDescription();
socket.send(
JSON.stringify({
type: "OFFER",
message: peerConnection.localDescription,
})
);
} catch (err) {
console.error(err);
} finally {
makingOffer = false;
}
};
First we set our local description and then send a message of type OFFER
to other peers.
πͺ Handling the messages
If we go back to the connect
function, we see that we attach a event handler called handleMessage
to the onmessage event. This function will handle ice candidates and the SDP Answer and Offer we receive from out custom singling server.
let handleMessage = async ({ data }) => {
data = JSON.parse(data);
if (data["type"] == "USER_JOIN") {
polite = true
createStreams();
}
if (data["type"] === "OFFER") {
console.log("received offer")
handlePerfectNegotiation(data)
}
if (data["type"] === "ANSWER") {
console.log("received answer")
handlePerfectNegotiation(data)
}
if(data["type"] === "candidate") {
handleIceCandidate(data)
}
};
This is a relatively simple function, In case of a new user joining we set the polite
to true
which is false
by default.
In case of the ANSWER
or OFFER
, we call the handlePerfectNegotiation
function, This is the last piece of the puzzle. So lets go over it
let handlePerfectNegotiation = async ({ message }) => {
try {
if (message) {
const offerCollision =
message.type === "offer" &&
(makingOffer || peerConnection.signalingState !== "stable");
ignoreOffer = !polite && offerCollision;
if (ignoreOffer) {
return;
}
await peerConnection.setRemoteDescription(message);
if (message.type === "offer") {
await peerConnection.setLocalDescription();
socket.send(JSON.stringify({
type: "ANSWER",
message: peerConnection.localDescription,
}));
}
}
} catch (err) {
console.error(err);
}
};
This function, when called from the side of the impolite peer ignores collision and keeps sending SDP offers and incase of the polite peer it acknowledges the collision, retracts its offers and accepts the offer of the impolite peer.
Now all we need is to handle the ice candidates and we are good to go.
let handleIceCandidate = async ({ candidate }) => {
if (peerConnection && peerConnection.remoteDescription) {
peerConnection.addIceCandidate(candidate);
}
};
Tying everything together with
document.addEventListener(
"DOMContentLoaded",
async function () {
await init();
},
false
);
and Boom we are done π₯³π₯³π₯³π₯³π₯³π₯³π₯³
Thank you for following along and please share your feedback if you have any.
Follow Best Practices
In this article we have not followed some best practices, In a production environment. The websocket URL would not be hardcoded and on disconnect we would remove the streams.
Useful Links.
Posted on April 7, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.