“This is my 37th day of participating in the First Challenge 2022. For details: First Challenge 2022”
Analysis of SFU packages
This is a configuration structure used to load the config file config.toml. There are also some corresponding structures for fields, more on this later.
type Config struct { SFU struct { Ballast int64 `mapstructure:"ballast"` WithStats bool `mapstructure:"withstats"` } `mapstructure:"sfu"` WebRTC WebRTCConfig `mapstructure:"webrtc"` Log log.Config `mapstructure:"log"` Router RouterConfig `mapstructure:"router"` Turn TurnConfig `mapstructure:"turn"` }Copy the code
In the sFU entry there are several lines, which are load configuration:
var conf = sfu.Config{}
err = viper.GetViper().Unmarshal(&conf)
s := sfu.NewSFU(conf)
Copy the code
The entire Config is used to initialize the SFU instance.
Let’s analyze it in the order of the source files
sfu.go
The first type exposed, the transport configuration, is needed to create PeerConection.
type WebRTCTransportConfig struct { configuration webrtc.Configuration setting webrtc.SettingEngine router RouterConfig Func NewWebRTCTransportConfig(c Config) WebRTCTransportConfig {se := webrtc.settingEngine {} var icePortStart, IcePortEnd uint16 // If turn relay is Enabled, the port range is 46884-60999 // If not, use the range specified in config. Toml preferentially if c.t.enabled {icePortStart = sfuMinPort icePortEnd = sfuMaxPort } else if len(c.WebRTC.ICEPortRange) == 2 { icePortStart = c.WebRTC.ICEPortRange[0] icePortEnd = c.WebRTC.ICEPortRange[1] } if icePortStart ! = 0 || icePortEnd ! = 0 { if err := se.SetEphemeralUDPPortRange( icePortStart, icePortEnd); err ! = nil { panic(err) } } var iceServers []webrtc.ICEServer if c.WebRTC.Candidates.IceLite { se.SetLite(c.WebRTC.Candidates.IceLite) } else { for _, iceServer := range c.WebRTC.ICEServers { s := webrtc.ICEServer{ URLs: iceServer.URLs, Username: iceServer.Username, Credential: iceServer.Credential, } iceServers = append(iceServers, s) } } se.BufferFactory = bufferFactory.GetOrNew sdpSemantics := webrtc.SDPSemanticsUnifiedPlan switch c.WebRTC.SDPSemantics { case "unified-plan-with-fallback": sdpSemantics = webrtc.SDPSemanticsUnifiedPlanWithFallback case "plan-b": sdpSemantics = webrtc.SDPSemanticsPlanB } w := WebRTCTransportConfig{ configuration: webrtc.Configuration{ ICEServers: iceServers, SDPSemantics: sdpSemantics, }, setting: se, router: c.Router, } if len(c.WebRTC.Candidates.NAT1To1IPs) > 0 { w.setting.SetNAT1To1IPs(c.WebRTC.Candidates.NAT1To1IPs, webrtc.ICECandidateTypeHost) } if c.SFU.WithStats { w.router.WithStats = true stats.InitStats() } return w }Copy the code
The construction of the transfer object is based on the configuration, which is applied to all sessions and will not be changed during the whole process. There are two SDP organization modes, Plan B and Unified Plan. Then it is replaced by Plan B, which is a media-level (m=) in SDP and contains multiple streams, which are distinguished by MSID. Now plan B is gradually replaced by jSEP unified Plan. Unified Plan is a media level (m=) in SDP that represents a stream. SDP can contain multiple media levels.
The second exposed type is SFU, which represents an SFU instance.
type SFU struct {
sync.RWMutex
webrtc WebRTCTransportConfig
router RouterConfig
turn *turn.Server
sessions map[string]*Session
withStats bool
}
Copy the code
SFU contains the transport configuration WebRTCTransportConfig, a TURN service, and maintains a list of sessions, which are the boundaries of webrTC traffic.
func NewSFU(c Config) *SFU { // Init random seed rand.Seed(time.Now().UnixNano()) // Init ballast ballast := make([]byte, c.SFU.Ballast*1024*1024) // Init buffer factory bufferFactory = buffer.NewBufferFactory() // Init packet factory packetFactory = &sync.Pool{ New: func() interface{} { return make([]byte, 1460) }, } w := NewWebRTCTransportConfig(c) s := &SFU{ webrtc: w, sessions: make(map[string]*Session), withStats: c.Router.WithStats, } if c.Turn.Enabled { ts, err := initTurnServer(c.Turn, nil) if err ! = nil { log.Panicf("Could not init turn server err: %v", err) } s.turn = ts } runtime.KeepAlive(ballast) return s }Copy the code
This SFU instance construct has some fun:
- Sync.pool temporary objects are stored and retrieved separately to reduce GC stress
- The runtime KeepAlive keep alive
Ballast requested a specified size of memory in order to reduce the number of GC triggers.
Instead of looking at the standard library, let’s see what else the sFU instance construction does: initTurnServer, since pion implements its own turn service and is integrated into the ION project, starts the TURN service directly.
For SFU, there are several other methods:
Func (s *SFU) newSession(id string) *Session {Session := newSession(id) session.OnClose(func() { s.Lock() delete(s.sessions, id) s.Unlock() if s.withStats { stats.Sessions.Dec() } }) s.Lock() s.sessions[id] = session s.Unlock() if s.withStats { stats.Sessions.Inc() } return session } func (s *SFU) getSession(id string) *Session { s.RLock() defer s.RUnlock() return s.sessions[id] } func (s *SFU) GetSession(sid string) (*Session, WebRTCTransportConfig) { session := s.getSession(sid) if session == nil { session = s.newSession(sid) } return session, s.webrtc }Copy the code
The SFU is embedded with the sync.RWMutex read and write lock. Sfu.getsession () always returns a session and transport configuration. The transport configuration is fixed, and if the session ID does not exist, a new session is created.
session.go
The Session type is exposed:
type Session struct {
id string
mu sync.RWMutex
peers map[string]*Peer
onCloseHandler func()
closed bool
}
Copy the code
Peers maintains all participants in the current Session. Participants in one session automatically subscribe to the streams of other participants.
The construction is very simple and you only need to provide the session ID.
func NewSession(id string) *Session {
return &Session{
id: id,
peers: make(map[string]*Peer),
closed: false,
}
}
Copy the code
Peers in sfu.Session are non-exposed, so there will be methods to maintain participants:
func (s *Session) AddPeer(peer *Peer) { s.mu.Lock() s.peers[peer.id] = peer s.mu.Unlock() } func (s *Session) RemovePeer(pid string) { s.mu.Lock() log.Infof("RemovePeer %s from session %s", pid, s.id) delete(s.peers, pid) s.mu.Unlock() if len(s.peers) == 0 && s.onCloseHandler ! = nil && ! s.closed { s.onCloseHandler() s.closed = true } }Copy the code
Participants leave the Session, when the Session is 0, the number of participants will trigger a sfu. Session. OnCloseHandler (), this is called when the Session closes.
Each participant has a distinct identifier. Here is how to look up the participant/set the session closure by the identifier:
func (s *Session) Peers() map[string]*Peer {
s.mu.RLock()
defer s.mu.RUnlock()
return s.peers
}
func (s *Session) OnClose(f func()) {
s.onCloseHandler = f
}
Copy the code
Publish and subscribe: Publish is granular, which causes others in the session to subscribe to the Router. Subcribe is granular, which causes others in the session to subscribe to someone. Subcribe also made a datachannel connection.
func (s *Session) Publish(router Router, r Receiver) { s.mu.RLock() defer s.mu.RUnlock() for pid, p := range s.peers { // Don't sub to self if router.ID() == pid { continue } log.Infof("Publishing track to peer %s", pid) if err := router.AddDownTracks(p.subscriber, r); err ! = nil { log.Errorf("Error subscribing transport to router: %s", err) continue } } } // Subscribe will create a Sender for every other Receiver in the session func (s *Session) Subscribe(peer *Peer) { s.mu.RLock() defer s.mu.RUnlock() subdChans := false for pid, p := range s.peers { if pid == peer.id { continue } err := p.publisher.GetRouter().AddDownTracks(peer.subscriber, nil) if err ! = nil { log.Errorf("Subscribing to router err: %v", err) continue } if ! subdChans { for _, dc := range p.subscriber.channels { label := dc.Label() n, err := peer.subscriber.AddDataChannel(label) if err ! = nil { log.Errorf("error adding datachannel: %s", err) continue } n.OnMessage(func(msg webrtc.DataChannelMessage) { s.onMessage(peer.id, label, msg) }) } subdChans = true peer.subscriber.negotiate() } } }Copy the code
In the subscription, after the Datachannel is connected, the message handler is set up via datachannel.OnMessage.
func (s *Session) onMessage( origin, label string, msg webrtc.DataChannelMessage) { s.mu.RLock() defer s.mu.RUnlock() for pid, p := range s.peers { if origin == pid { continue } dc := p.subscriber.channels[label] if dc ! = nil && dc.ReadyState() == webrtc.DataChannelStateOpen { if msg.IsString { if err := dc.SendText(string(msg.Data)); err ! = nil { log.Errorf("Sending dc message err: %v", err) } } else { if err := dc.Send(msg.Data); err ! = nil { log.Errorf("Sending dc message err: %v", err) } } } } }Copy the code
As you can see from the code, session. onMessage is broadcast to other participants via the datachannel.
func (s *Session) AddDatachannel(owner string, dc *webrtc.DataChannel) { label := dc.Label() s.mu.RLock() defer s.mu.RUnlock() s.peers[owner].subscriber.channels[label] = dc dc.OnMessage(func(msg webrtc.DataChannelMessage) { s.onMessage(owner, label, msg) }) for pid, p := range s.peers { if owner == pid { continue } n, err := p.subscriber.AddDataChannel(label) if err ! = nil { log.Errorf("error adding datachannel: %s", err) continue } pid := pid n.OnMessage(func(msg webrtc.DataChannelMessage) { s.onMessage(pid, label, msg) }) p.subscriber.negotiate() } }Copy the code
Session.AddDatachannel Specifies a datachannel for a specified participant and connects the datachannel of other participants to the datachannnel of the specified participant.
AddDatachannel+Publish=Subcribe
peer.go
Types of external exposure:
type SessionProvider interface {
GetSession(sid string) (*Session, WebRTCTransportConfig)
}
Copy the code
The SFU type implements this interface. Because the Peer object needs to obtain session information, it can use this interface.
type Peer struct {
sync.Mutex
id string
session *Session
provider SessionProvider
publisher *Publisher
subscriber *Subscriber
OnOffer func(*webrtc.SessionDescription)
OnIceCandidate func(*webrtc.ICECandidateInit, int)
OnICEConnectionStateChange func(webrtc.ICEConnectionState)
remoteAnswerPending bool
negotiationPending bool
}
Copy the code
A Peer is a pair of P2P connections.
func NewPeer(provider SessionProvider) *Peer {
return &Peer{
provider: provider,
}
}
Copy the code
Once constructed, you can send signaling. When used, the composition parameter can use an SFU instance.
Peer provides the following methods, corresponding to the flow of the Peer:
- Join initializes a Peer with the session ID and an SDP offer
- Answer Processes a standard SDP offer and returns an SDP Answer
- SetRemoteDescription calls this method after receiving an SDP answer
- Trickle-down deals with ice candidates
- Close Close Peer
Here’s a look at them all
func (p *Peer) Join( sid string, sdp webrtc.SessionDescription) (*webrtc.SessionDescription, error) { if p.publisher ! = nil { log.Debugf("peer already exists") return nil, ErrTransportExists} pid := cuid.new () p.id = pid var (CFG WebRTCTransportConfig err Error) // Obtains session information and transmission object information P.session, CFG = p.provider.getSession (sid) // Create a new Subsciber, err = NewSubscriber(pid, cfg) if err ! = nil { return nil, fmt.Errorf("error creating transport: %v", err)} // create a NewPublisher, the next step is p.publisher, err = NewPublisher(p.ession, pid, CFG) if err! = nil { return nil, fmt.Errorf("error creating transport: %v", Err)} / / set NegotiationNeeded processing p. ubscriber. OnNegotiationNeeded (func () {p.L ock () defer p.U nlock the if () p.remoteAnswerPending { p.negotiationPending = true return } log.Debugf("peer %s negotiation needed", p.id) offer, err := p.subscriber.CreateOffer() if err ! = nil { log.Errorf("CreateOffer error: %v", err) return } p.remoteAnswerPending = true if p.OnOffer ! = nil { log.Infof("peer %s send offer", P.i d) p.O nOffer (& offer)}}) / / set the subscriber/publisher ice processing p. ubscriber. OnICECandidate (func (c * webrtc. ICECandidate) { log.Debugf("on subscriber ice candidate called for peer " + p.id) if c == nil { return } if p.OnIceCandidate ! = nil { json := c.ToJSON() p.OnIceCandidate(&json, subscriber) } }) p.publisher.OnICECandidate(func(c *webrtc.ICECandidate) { log.Debugf("on publisher ice candidate called for peer " + p.id) if c == nil { return } if p.OnIceCandidate ! = nil { json := c.ToJSON() p.OnIceCandidate(&json, publisher) } }) p.publisher.OnICEConnectionStateChange(func(s webrtc.ICEConnectionState) { if p.OnICEConnectionStateChange ! = nil {p.O nICEConnectionStateChange (s)}}) / / Peer initialization complete, add to the list of session p. ession. AddPeer (p) the Infof (" Peer % s join the session %s", p.id, sid) // Handle offer, return answer, err := p.publisher.Answer(SDP) if err! = nil { return nil, fmt.Errorf("error setting remote description: %v", err)} log.infof ("peer %s send answer", p.id)Copy the code
All push and pull flows can be performed only after a Join is performed first.
func (p *Peer) Answer( sdp webrtc.SessionDescription) (*webrtc.SessionDescription, error) { if p.subscriber == nil { return nil, ErrNoTransportEstablished } log.Infof("peer %s got offer", P.i d) / / if negotiation status is not the initial state, just ignore the SDP offer processing requests if p.p ublisher. SignalingState ()! = webrtc.SignalingStateStable { return nil, ErrOfferIgnored } answer, err := p.publisher.Answer(sdp) if err ! = nil { return nil, fmt.Errorf("error creating answer: %v", err) } log.Infof("peer %s send answer", p.id) return &answer, nil } func (p *Peer) SetRemoteDescription(sdp webrtc.SessionDescription) error { if p.subscriber == nil { return ErrNoTransportEstablished } p.Lock() defer p.Unlock() log.Infof("peer %s got answer", p.id) if err := p.subscriber.SetRemoteDescription(sdp); err ! = nil { return fmt.Errorf("error setting remote description: %v", err) } p.remoteAnswerPending = false if p.negotiationPending { p.negotiationPending = false go p.subscriber.negotiate() } return nil }Copy the code
The remote SDP is set here, using subscriber, which needs to be checked in detail later.
func (p *Peer) Trickle(candidate webrtc.ICECandidateInit, target int) error { if p.subscriber == nil || p.publisher == nil { return ErrNoTransportEstablished } log.Infof("peer %s trickle", p.id) switch target { case publisher: if err := p.publisher.AddICECandidate(candidate); err ! = nil { return fmt.Errorf("error setting ice candidate: %s", err) } case subscriber: if err := p.subscriber.AddICECandidate(candidate); err ! = nil { return fmt.Errorf("error setting ice candidate: %s", err) } } return nil }Copy the code
Process the ICE candidate, selecting publisher or Subscriber by target.
func (p *Peer) Close() error { if p.session ! = nil { p.session.RemovePeer(p.id) } if p.publisher ! = nil { p.publisher.Close() } if p.subscriber ! = nil { if err := p.subscriber.Close(); err ! = nil { return err } } return nil }Copy the code
Close clears the participant list of the session, as well as publisher and subscriber.
publisher.go
Externally exposed is the Publisher type:
type Publisher struct {
id string
pc *webrtc.PeerConnection
router Router
session *Session
candidates []webrtc.ICECandidateInit
onTrackHandler func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
onICEConnectionStateChangeHandler atomic.Value // func(webrtc.ICEConnectionState)
closeOnce sync.Once
}
Copy the code
The Publisher includes webrtc. PeerConnection. Ice candidate cache, at the same time also support rail treatment/ice connection status.
The Router encapsulates the RTP/RTCP of a track.
func NewPublisher(session *Session, id string, cfg WebRTCTransportConfig) (*Publisher, Me, err := getPublisherMediaEngine() if err! = nil { log.Errorf("NewPeer error: %v", err) return nil, ErrPeerConnectionInitFailed} / / / / construct a webrtc API object written here is very interesting, refer to the option mode API: = Webrtc.newapi (webrtc.withMediaEngine (me), webrtC.withsettingEngine (cfg.setting)) err := api.NewPeerConnection(cfg.configuration) if err ! = nil { log.Errorf("NewPeer error: %v", err) return nil, errPeerConnectionInitFailed } p := &Publisher{ id: id, pc: pc, session: session, router: newRouter(pc, id, cfg.router), } pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { log.Debugf("Peer %s got remote track id: %s \ mediaSSRC: %d rid :%s streamID: %s", p.id, track.ID(), track.SSRC(), track.RID(), track.StreamID()) if r, pub := p.router.AddReceiver(receiver, track); pub { p.session.Publish(p.router, r) } }) pc.OnDataChannel(func(dc *webrtc.DataChannel) { if dc.Label() == apiChannelLabel { // terminate api data channel return } p.session.AddDatachannel(id, dc) }) pc.OnICEConnectionStateChange( func(connectionState webrtc.ICEConnectionState) { log.Debugf("ice connection state: %s", connectionState) switch connectionState { case webrtc.ICEConnectionStateFailed: fallthrough case webrtc.ICEConnectionStateClosed: log.Debugf("webrtc ice closed for peer: %s", p.id) p.Close() } if handler, ok := p.onICEConnectionStateChangeHandler.Load().( func(webrtc.ICEConnectionState)); ok && handler ! = nil { handler(connectionState) } }) return p, nil }Copy the code
In the construction of Publisher, the first is to construct a Webrtc.mediaengine. The construction process is listed briefly in mediaengine.go:
- Construct an empty Webrtc.mediaEngine
- Register opus 111
- Register video encoding
- vp8 96
- vp9 profile-id=0 98
- vp9 profile-id=1 100
- h264
- level-asymmetry-allowed=1; packetization-mode=1; profile-level-id=42001f 102
- level-asymmetry-allowed=1; packetization-mode=0; profile-level-id=42001f 127
- level-asymmetry-allowed=1; packetization-mode=1; profile-level-id=42e01f 125
- level-asymmetry-allowed=1; packetization-mode=0; profile-level-id=42e01f 108
- level-asymmetry-allowed=1; packetization-mode=1; profile-level-id=640032 123
- Set the audio and video extension header