Masui Masanori
Posted on May 27, 2022
Intro
I will try updating sending data through channels of goroutine and adding Data channels of WebRTC.
Examples
[Go] Closing chan
Last time, I wrote handling "webrtc.PeerConnection" events by channels like below.
sseClient.go
...
type SSEClient struct {
candidateFound chan *webrtc.ICECandidate
changeConnectionState chan webrtc.PeerConnectionState
addTrack chan *webrtc.TrackRemote
userName string
w http.ResponseWriter
}
func newSSEClient(userName string, w http.ResponseWriter) *SSEClient {
return &SSEClient{
candidateFound: make(chan *webrtc.ICECandidate),
changeConnectionState: make(chan webrtc.PeerConnectionState),
addTrack: make(chan *webrtc.TrackRemote),
userName: userName,
w: w,
}
}
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
flusher, _ := w.(http.Flusher)
defer func() {
hub.unregister <- ps
if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
ps.peerConnection.Close()
}
close(newClient.candidateFound)
close(newClient.changeConnectionState)
close(newClient.addTrack)
}()
for {
select {
case candidate := <-newClient.candidateFound:
jsonValue, err := NewCandidateMessageJSON(newClient.userName, candidate)
if err != nil {
return
}
fmt.Fprintf(w, "data: %s\n\n", jsonValue)
flusher.Flush()
case track := <-newClient.addTrack:
hub.addTrack <- track
case connectionState := <-newClient.changeConnectionState:
switch connectionState {
case webrtc.PeerConnectionStateFailed:
return
case webrtc.PeerConnectionStateClosed:
return
}
case <-r.Context().Done():
return
}
}
}
...
peerConnectionState.go
...
type PeerConnectionState struct {
peerConnection *webrtc.PeerConnection
client *SSEClient
}
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
if i == nil {
return
}
client.candidateFound <- i
})
peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
// panic after closing all SSEClient channels.
client.changeConnectionState <- p
})
peerConnection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
client.addTrack <- t
})
return &PeerConnectionState{
peerConnection: peerConnection,
client: client,
}, nil
}
But after closing all SSEClient channels, panic would be occurred in "peerConnection.OnConnectionStateChange".
Because it tried sending "closed" state by the closed channel.
So I add another channel and check if the channels are closed before sending values.
sseClient.go
...
type SSEClient struct {
candidateFound chan *webrtc.ICECandidate
changeConnectionState chan webrtc.PeerConnectionState
addTrack chan *webrtc.TrackRemote
heartbeat chan int
userName string
w http.ResponseWriter
}
...
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
defer func() {
hub.unregister <- ps
if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
ps.peerConnection.Close()
}
close(newClient.candidateFound)
close(newClient.changeConnectionState)
close(newClient.addTrack)
close(newClient.heartbeat)
}()
...
}
peerConnectionState.go
...
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
// must be a "buffered channel" to avoid blocking on setting the first value.
heartbeat := make(chan int, 1)
candidateFound := make(chan *webrtc.ICECandidate)
changeConnectionState := make(chan webrtc.PeerConnectionState)
addTrack := make(chan *webrtc.TrackRemote)
// set the first value to avoid blocking on reading the channel value on first time.
heartbeat <- 1
...
peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
// until the channel being closing, "ok" value keeps being "true".
_, ok := <-heartbeat
if ok {
changeConnectionState <- p
// write value for next reading.
heartbeat <- 1
}
})
...
But I still couldn't solve the problem after the change.
Although I close the all channels, the "ok" value didn't changed.
peerConnectionState.go
...
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
...
peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
// "ok" value is "true".
_, ok := <-heartbeat
if ok {
// but the channels are closed.
changeConnectionState <- p
heartbeat <- 1
}
})
...
Thus, I read all values of "heartbeat" before closing it.
sseClient.go
...
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
...
defer func() {
hub.unregister <- ps
// read all values before closing.
for i := 0; i < len(newClient.heartbeat); i++ {
<-newClient.heartbeat
}
close(newClient.heartbeat)
close(newClient.candidateFound)
close(newClient.changeConnectionState)
close(newClient.addTrack)
if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
ps.peerConnection.Close()
}
}()
...
}
Finally, I moved the channels that handles WebRTC events to PeerConnectionState.
Because the events belongs to webrtc.PeerConnection.
[WebRTC] Adding WebRTC DataChannel
From now, I will try sending data by WebRTC DataChannel.
I can send texts(string) and binaries(ex.UInt8Array) by WebRTC DataChannel.
I can negotiate the RTCDataChannel's connection by the application.
To do this, I have to set "negotiated" of RTCDataChannelInit true when I create RTCDataChannel instances from RTCPeerConnection.
const dc = peerConnection.createDataChannel(label, {
id,
negotiated: true,
ordered: false
});
// handling received data as texts.
const decoder = new TextDecoder("utf-8");
dc.onmessage = (ev) => {
const message = decoder.decode(new Uint8Array(ev.data));
// TODO: handle string value.
};
I also must create them before the first offer/answer exchange is complete.
Now I can separate the RTCDataChannels by their IDs.
I can create webrtc.DataChannel in the same way in Go.
func newWebRTCDataChannel(label string, id uint16, peerConnection *webrtc.PeerConnection) (*webrtc.DataChannel, error) {
negotiated := true
ordered := false
dc, err := peerConnection.CreateDataChannel(label, &webrtc.DataChannelInit{
ID: &id,
Negotiated: &negotiated,
Ordered: &ordered,
})
if err != nil {
return nil, err
}
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
message := string(msg.Data)
// TODO: handle string value.
})
return dc, nil
}
In this time, I just send data from one client to other clients, and I don't change on the server side.
dc.DataChannels[id].Send(msg.Data)
Another important thing is the DataChannels must be closed before closing the PeerConnection.
- WebRTC 1.0: Real-Time Communication Between Browsers
- RFC8831 - WebRTC Data Channels
- RFC8832 - WebRTC Data Channel Establishment Protocol
- RTCDataChannel - Web APIs|MDN
- Using WebRTC data channels - Web APIs|MDN
- WebRTC DataChannel コトハジメ - voluntas - GitHub
- ハイパフォーマンス ブラウザネットワーキング(High Performance Browser Networking)
Posted on May 27, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.