Dive into Web RTC or write SFU on your own

Dive into Web RTC or write SFU on your own

WebRTC is a browser-based technology designed for streaming data between browsers or applications using point-to-point technology (point-to-point transmission).

Web RTC has been supported in most browsers for a long time, so bypassing the technology is quite pointless. That’s what I thought, that’s why the pet project decided to write an sfu-server on golang.

About Web RTC itself

Here I will briefly run through the basics of Web RTC, for those who are interested in going a little deeper, I leave the link here.

The SDP (Session Description Protocol) protocol is used in order for two parties to be able to provide themselves with an RTCPeerConnection. The protocol has a “key-value” structure and is essentially a description of a separate banquet (the name speaks for itself).

SessionDescription example

v=0
o=- 0 0 IN IP4 127.0.0.1
s=-
c=IN IP4 127.0.0.1
t=0 0
m=audio 4000 RTP/AVP 111
a=rtpmap:111 OPUS/48000/2
m=video 4002 RTP/AVP 96
a=rtpmap:96 VP8/90000

After the party has generated a SessionDescription (hereinafter simply SD) to us, the party sends it to the other party in the form of an offer.

Here the question immediately arises: “And where, in fact, to send?” And the question is absolutely correct, the banquet itself cannot know the IP address of another banquet. This is where turn & signal servers come to our aid.

Signal is a server that banquets connect to for SD exchange. We will dwell on it in more detail a little later.

Turn server solves another problem. Our devices often access the Internet via wifi, so they are inside the local NAT of your router and do not have a public IP address. But this is not a problem for the turn server. With the help of its “Magic Cookie”, it will still reach your device.

Yes, we formed an SD, found out our IP, even sent our SD as an offer to the signal server. Next, the second party records the SD sent by us, generates its SD and sends its signal to the server in the form of a response. The first party records the received SD as an answer to itself.

In addition to exchanging media information (discussed above in offer/answer and SDP), nodes must exchange network connection information. This is known as an ICE candidate and details the available methods that a node can communicate with (either directly or through a TURN server). ICE-candidate banquets are also shared through the signal server.

And now, after all the tambourine dancing, we can finally install our RTCPeerConnection.

And why do we need SFU?

Select Forwarding Unit comes to the rescue when the number of banquets in one session reaches 7+. Consider an example:

We have 7 banquets, each sending video and audio tracks. In p2p connection format, we will get: 6+5+4+3+2+1=21 Peer connections. And the number of tracks will be 4 times greater (from each such connection on both sides there is a video and audio track).

There is absolutely inefficient use of resources. Therefore, we move from a connected topology to a star topology:

Now we have 7 PeerConnections where for each banquet we get 2 outgoing and 12 incoming tracks. Although Web RTC was originally intended for browsers to communicate directly, it is in connection with SFU that we get the opportunity to really feel the power of this technology.

It should be noted that there is another option for organizing the connection – MCU (Multipoint Control Unit). But then the responsibilities of our server will also include packaging of all output tracks for each banquet into a single MediaStream. However, it will not be possible for the user to interact with these threads.

I will give an example: Zoom and its ability to mute and move tiles from users’ videos. It is thanks to the fact that each tile is a separate MediaStream that you can interact with it. If we implemented the MCU, we would not get many tiles of video, but in a single block of video streams that the MCU carefully provided for us. Thus, by increasing the load on the server many times, we cut many possibilities for the user. Yes, the load on the client will then practically be reduced to nothing, but do these advantages cover the disadvantages that have appeared. I think no.

Here we go

Now we are starting to implement the project. First, let’s define the structure of our sfu.

Our server will consist of two parts: Signal & Coordinator. The first will provide the exchange of SD & ICE candidates, the second will control incoming and outgoing flows.

Peer

Peer will be an elementary structure and will represent the user

type Peer struct {
	id         string
	connection *webrtc.PeerConnection
	streams    map[string]*webrtc.TrackRemote
	mutex      sync.RWMutex
	socket     *websocket.Conn
}

So far, everything is simple here, the class contains a socket, the connection itself, and the output tracks from the user

Now let’s describe the behavior of our banquet

type PeerInterface interface {
	SetSocket(ws_conn *websocket.Conn)
	AddRemoteTrack(track *webrtc.TrackRemote)
	RemoveRemoteTrack(track *webrtc.TrackRemote)
	SetPeerConnection(conn *webrtc.PeerConnection)
	ReactOnOffer(offer webrtc.SessionDescription)
	ReactOnAnswer(answer_str string)
}

Also, everything is standard so far, several methods for setting the value of Peer fields. The only thing to stop at: ReactOnAnswer& ReactOnOffer:

func (peer *Peer) ReactOnOffer(offer_str string) (webrtc.SessionDescription, error) {
	peer.mutex.Lock()
	defer peer.mutex.Unlock()

	offer := webrtc.SessionDescription{
		Type: webrtc.SDPTypeOffer,
		SDP:  offer_str,
	}
	err := peer.connection.SetRemoteDescription(offer)
	if err != nil {
		fmt.Println(err)
		return offer, err
	}
	fmt.Println("Remote Description was set for peer ", peer.id)
	answer, err := peer.connection.CreateAnswer(nil)
	_ = peer.connection.SetLocalDescription(answer)
	fmt.Println("Local Description was set for peer ", peer.id)
	if err != nil {
		return offer, err
	}
	fmt.Println("Answer was created in peer ", peer.id)
	return answer, nil

}

When from the signal server we receive an Offer from another banquet, we need to save the SD that came to us, for us it is a remote SD, so SetRemoteDescription() will help us in our task. Next, as already described in the theoretical part, we need to send an Answer so that the other banquet also sets the RemoteDescription, but before that we also save our own LocalDescription

func (peer *Peer) ReactOnAnswer(answer_str string) error {
	peer.mutex.Lock()
	defer peer.mutex.Unlock()
	answer := webrtc.SessionDescription{
		Type: webrtc.SDPTypeAnswer,
		SDP:  answer_str,
	}
	err := peer.connection.SetRemoteDescription(answer)
	if err != nil {
		fmt.Println(err)
		return err
	}
	return nil
}

The situation is identical, only now we are on the other side of the barricades. We sent an Offer, received an Answer, and set a RemoteDescription.

Room

We rise a little higher. The Room structure will be a single video and/or audio conference session.

type Room struct {
	id     string
	mutex  sync.RWMutex
	peers  map[string]*Peer
	tracks map[string]*webrtc.TrackLocalStaticRTP
}

It will contain all banquets in this room, and will also save the original tracks to itself in the tracks field and transfer them to other banquets.

Let’s describe the behavior:

type RoomInterface interface {
	JoinRoom(id string)
	AddPeer(peer *Peer)
	RemovePeer(peer_id string)
	AddTrack(track *webrtc.TrackRemote)
	RemoveTrack(track *webrtc.TrackRemote)
	SendAnswer(message webrtc.SessionDescription, peer_id string)
	SendOffer(message webrtc.SessionDescription, peer_id string)
	SendICE(message *webrtc.ICECandidate, peer_id string)
	Signal()
}

Also a few standard functions, let’s run through them:

  1. AddPeer(peer *Peer) – adds a user to the room.peers field

  2. RemovePeer(peer_id string) – removes the user from the room, if there is one

  3. AddTrack(track *webrtc.TrackRemote) – adds a new track and will add it to itself in room.tracks

  4. RemoveTrack(track *webrtc.TrackRemote) – removes the track accordingly

  5. SendAnswer, SendOffer, SendICE – send Offer, Answer & Ice-candidate respectively to all users in the room except banquet with id=peer_id.

Now let’s move on to the most interesting: room.Signal()

func (room *Room) Signal() {
	room.mutex.Lock()
	defer room.mutex.Unlock()
	attemptSync := func() (again bool) {
		for _, peer := range room.peers {
          
			// 1) Check if peer is already closed
			if peer.connection.ConnectionState() == webrtc.PeerConnectionStateClosed {
				fmt.Println("Peer with peer_id", peer.id, "was disconnected")
				room.RemovePeer(peer.id)
				return true
			}
            // 2) 
			existingSenders := map[string]bool{}
			for _, sender := range peer.connection.GetSenders() {
				if sender.Track() == nil {
					continue
				}
                // 3)
				existingSenders[sender.Track().ID()] = true
				// If we have a RTPSender that doesn't map to a existing track remove and signal
				if _, ok := room.tracks[sender.Track().ID()]; !ok {
					if err := peer.connection.RemoveTrack(sender); err != nil {
						fmt.Println("Track", sender.Track().ID(), "was removed")
						return true
					}
				}
			}

			// 4) Don't receive videos we are sending, make sure we don't have loopback
			for _, receiver := range peer.connection.GetReceivers() {
				if receiver.Track() == nil {
					continue
				}

				existingSenders[receiver.Track().ID()] = true
			}
			// 5) Add all track we aren't sending yet to the PeerConnection
			for trackID := range room.tracks {
				if _, ok := existingSenders[trackID]; !ok {
					if _, err := peer.connection.AddTrack(room.tracks[trackID]); err == nil {
						fmt.Println("New track are sending for peer", peer.id)
						return true
					} else {
						fmt.Println(err)
					}
				}
			}
            // 6)
			if peer.connection.PendingLocalDescription() != nil {
				fmt.Println(peer.connection.PendingLocalDescription())
				offer, err := peer.connection.CreateOffer(&webrtc.OfferOptions{
					OfferAnswerOptions: webrtc.OfferAnswerOptions{},
					ICERestart:         true,
				})
				if err != nil {
					fmt.Println("Error in CreateOffer: ", err)
					return true
				}
				if err = peer.connection.SetLocalDescription(offer); err != nil {
					fmt.Println("Offer: ", offer)
					fmt.Println("Cannot set LocalDescription: ", err)
					return false
				}

				offerString, err := json.Marshal(offer)
				if err != nil {
					fmt.Println("Marshalling failed: ", err)
					return true
				}

				if err = peer.socket.WriteJSON(&WsMessage{
					Event: "offer",
					Data:  string(offerString),
				}); err != nil {
					fmt.Println("Cannot write message in WsMessage: ", err)
					return true
				}
			}

		}
		return
	}
    // 7)
	for syncAttempt := 0; ; syncAttempt++ {
		if syncAttempt == 25 {
			// Release the lock and attempt a sync in 3 seconds. We might be blocking a RemoveTrack or AddTrack
			go func() {
				time.Sleep(time.Second * 3)
				room.Signal()
			}()
			return
		}

		if !attemptSync() {
			fmt.Println("Signalling finished")
			break
		}
	}
}

This function contains the main part of the logic of Room. Let’s stop in more detail:

  1. Check out our current banquets. If it is closed, delete it.

  2. We get all Senders() from this banquet. Sender is the stream that comes to us from this feast.

  3. We check whether we have “hanging” tracks, that is, tracks that came from a user who has already left the room.

  4. We add tracks coming from the user to the existing senders variable. This is done in order not to add a track coming from this user at the next stage.

  5. Now the existing senders variable will contain all the tracks that the user already receives or sends. We only have to add those who are not yet included in this feast.

  6. Here we check the PendingLocalDescription. It should be explained here. In each PeerConnection we have two LocalDescription statuses: Current & Pending. The first is updated when the Offer is sent and the Answer is received. The second is updated when the connection parameters are changed. But it will take effect (that is, Current will be equal to Pending ) only when the Offer/Answer is exchanged again. Here there is a check for the value of nil. If PendingLocalDescription == nil, then no changes have occurred and PeerConnection does not need to be updated, otherwise, we send an Offer to this banquet. In the context of Web RTC, this is called renogotiation, you can read about it here.

  7. When updating customer statuses, we may encounter various errors. Example: adding new tracks does not happen instantly and can cause conflicting situations, so the room.Signal() restart mechanism is implemented with a delay of 3 seconds

Coordinator

Now we’ve moved on to the main structure that controls the behavior of all the rooms.

type Coordinator struct {
	sessioins map[string]*Room
}

As always, let’s describe its behavior first with the interface

type CoordinatorInterface interface {
	CreateRoom(id string)
	RemoveRoom(id string)
	AddUserToRoom(self_id string, room_id string, socket *websocket.Conn)
	RemoveUserFromRoom(self_id string, room_id string, socket *websocket.Conn)
	ShowSessions()
	ObtainEvent(message WsMessage, socket *websocket.Conn)
}

Let’s go through the methods:

  1. CreateRoom() & RemoveRoom() – creates and removes a room respectively

  2. ShowSessions() – displays all currently active rooms

  3. RemoveUserFromRoom() – removes the user from the room

  4. AddUserToRoom() – Adds a user and configures a PeerConnection

  5. ObtainEvent() is a method associated with our Signal server. When initializing the server, we will create a Coordinator structure and process all necessary events using this method

Let’s look at the AddUserToRoom() code:

func (coordinator *Coordinator) AddUserToRoom(self_id string, room_id string, socket *websocket.Conn) {
    // 1)
	if _, ok := coordinator.sessioins[room_id]; !ok {
		fmt.Println("New Room was created: ", room_id)
		coordinator.CreateRoom(room_id)
	}
	if room, ok := coordinator.sessioins[room_id]; ok {
		// 2) Add Peer to Room
		room.AddPeer(newPeer(self_id))
		fmt.Println("Peer ", self_id, "was added to room ", room_id)
		if peer, ok := room.peers[self_id]; ok {
			// 3) Set socket connection to Peer
			peer.SetSocket(socket)

			// 4) Create Peer Connection
			conn, err := webrtc.NewPeerConnection(webrtc.Configuration{})
			if err != nil {
				fmt.Println("Failed to establish peer connection")
			}

			peer.SetPeerConnection(conn)
			fmt.Println("Peer connection was established")
			// 5) Accept one audio and one video track incoming
			for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
				if _, err := peer.connection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
					Direction: webrtc.RTPTransceiverDirectionRecvonly,
				}); err != nil {
					log.Print(err)
					return
				}
			}

			// 6) If PeerConnection is closed remove it from global list
			peer.connection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
				switch p {
				case webrtc.PeerConnectionStateFailed:
					if err := peer.connection.Close(); err != nil {
						log.Print(err)
					}
				case webrtc.PeerConnectionStateClosed:
					room.Signal()
				default:
				}
			})

			// 7) When peer connection is getting the ICE -> send ICE to client
			peer.connection.OnICECandidate(func(i *webrtc.ICECandidate) {
				if i == nil {
					fmt.Println("ICEGatheringState: connected")
					return
				}
				fmt.Println("Ice: ", i)
				room.SendICE(i, self_id)
			})
            // 8) 
			peer.connection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
				fmt.Println("Track added from peer: ", self_id)
				defer room.Signal()
				// Create a track to fan out our incoming video to all peers
				trackLocal := room.AddTrack
				defer room.RemoveTrack(trackLocal)
				defer fmt.Println("Track", trackLocal, "was removed")
				buf := make([]byte, 1500)
				for {
					i, _, err := t.Read(buf)
					if err != nil {
						return
					}

					if _, err = trackLocal.Write(buf[:i]); err != nil {
						return
					}
				}
			})
		}

	}
}

Let’s go through the points:

  1. Checking if the room the user wants to connect to exists. If there is none, we create it.

  2. We add Peer to the room.

  3. We set the socket and RTCPeerConnection in Peer

  4. We accept tracks that come from a new user.

  5. We add an alarm to the room if the user turns off at some point

  6. Here we implemented the logic of sending each other ICE candidates. If we got nil as our ICE candidate, then ICE candidate forwarding is complete. Otherwise, we send the new ICE candidate to another banquet.

  7. We add a track and signal the room with the appearance of a new track.

Trickle ICE

In Pion, you can find an implementation of the so-called Trickle ICE. The essence of this approach is that the client does not wait for the end of the Offer/Answer. And in parallel sends/receives ICE-candidates as quickly as possible. We implemented this in our Peer.OnICECandidate.

Now it’s the turn for ObtainEvent:

func (coordinator *Coordinator) ObtainEvent(message WsMessage, socket *websocket.Conn) {
	wsMessage := message
	switch wsMessage.Event {
	case "joinRoom":
		go func() {
			m, ok := message.Data.(map[string]any)
			if ok {
				self_id := m["self_id"].(string)
				room_id := m["room_id"].(string)
				coordinator.AddUserToRoom(self_id, room_id, socket)
			}
		}()
	case "leaveRoom":
		go func() {
			m, ok := message.Data.(map[string]any)
			if ok {
				self_id := m["self_id"].(string)
				room_id := m["room_id"].(string)
				coordinator.RemoveUserFromRoom(self_id, room_id)
			}
		}()
	case "offer":
		go func() {
			m, ok := message.Data.(map[string]any)
			if ok {
				self_id, _ := m["self_id"].(string)
				room_id, _ := m["room_id"].(string)
				offer2 := m["offer"].(map[string]any)
				if room, ok := coordinator.sessioins[room_id]; ok {
					if peer, ok := room.peers[self_id]; ok {
						answer, err2 := peer.ReactOnOffer(offer2["sdp"].(string))
						if err2 != nil {
							fmt.Println(err2)
							return
						}
						room.SendAnswer(answer, self_id)
					}
				}
			}
		}()
	case "answer":
		go func() {
			m, ok := message.Data.(map[string]any)
			if ok {
				self_id, _ := m["self_id"].(string)
				room_id, _ := m["room_id"].(string)
				offer2 := m["answer"].(map[string]any)
				if room, ok := coordinator.sessioins[room_id]; ok {
					if peer, ok := room.peers[self_id]; ok {
						err := peer.ReactOnAnswer(offer2["sdp"].(string))
						if err != nil {
							fmt.Println(err)
							return
						}
					}

				}
			}
		}()
	case "ice-candidate":
		go func() {
			m, ok := message.Data.(map[string]any)
			if ok {
				self_id, _ := m["self_id"].(string)
				room_id, _ := m["room_id"].(string)
				candidate := m["candidate"].(map[string]any)
				i_candidate := candidate["candidate"].(string)
				sdp_mid := candidate["sdpMid"].(string)
				sdp_m_line_index := uint16(candidate["sdpMLineIndex"].(float64))
				var username_fragment string
				if candidate["usernameFragment"] != nil {
					username_fragment = candidate["usernameFragment"].(string)
				} else {
					username_fragment = ""
				}
				init := webrtc.ICECandidateInit{
					Candidate:        i_candidate,
					SDPMid:           &sdp_mid,
					SDPMLineIndex:    &sdp_m_line_index,
					UsernameFragment: &username_fragment,
				}
				if room, ok := coordinator.sessioins[room_id]; ok {
					if peer, ok := room.peers[self_id]; ok {
						if err := peer.connection.AddICECandidate(init); err != nil {
							log.Println(err)
							return
						}
						fmt.Println("ICE-CANDIDATE added for peer", peer.id)
						fmt.Println(peer.connection.ICEConnectionState())
						fmt.Println(peer.connection.ICEGatheringState())
					}
				}
			} else {
				fmt.Println(m)
				fmt.Println("nach")
			}
		}()
	default:
		fmt.Println("DEFAULT")
		fmt.Println(wsMessage)

	}

	return
}

Here we validate the value of the Event field. And we call the appropriate method. On input, we get WsMessage, which is a common method for different messages.

Signal

Now the final element of our server, a socket that will accept messages and send them to Coordinator.ObtainEvent()

// websockets listener
func (ws *WsServer) wsInit(w http.ResponseWriter, r *http.Request) {

	conn, err := upgrader.Upgrade(w, r, nil)

	defer conn.Close()

	fmt.Printf("Client connected")

	if err != nil {
		fmt.Printf(" with error %s", err)
		return
	}

	fmt.Println(" successfully")

	message := types.WsMessage{}

	for {
		messageType, bmessage, err := conn.ReadMessage()
		fmt.Println(bmessage)
		if err != nil {
			fmt.Println(err)
			return
		}
		if messageType == websocket.CloseMessage {
			break
		}

		err = json.Unmarshal(bmessage, &message)
		if err != nil {
			fmt.Println("DROP")
			fmt.Println(message.Data)
			fmt.Println(err)
			return
		}
		ws.coordinator.ObtainEvent(message, conn)
	}
}

Here, we send the message to WsMessage and give it to our coordinator

Conclusion

As a result, we have the starting version of SFU. As Web RTC for Curious said:

Building a simple SFU can be done in a window. Building a good SFU that can accommodate all types of customers is ignorance. Display control after control, Error Correction and Performance is never ending task.

Therefore, this is only the first version of the simple SFU server. There are still many different implementations ahead that will be able to improve this server and increase the quality of video/audio conferences. If you like this article, then I will not delay the release of the next ones.

The source code can be found in this repository.

We would also like to hear your wishes and tips for the development of SFU. Because it was implemented from scratch in both WebRTC and Golang knowledge.

Sources

https://developer.mozilla.org/ru/docs/Web/API/WebRTC_API/Protocols

https://webrtcforthecurious.com

Related posts