WebRTC Extension
Real-time peer-to-peer communication with WebRTC support, multiple topologies, and quality monitoring
WebRTC Extension
The WebRTC extension provides production-ready real-time peer-to-peer communication capabilities for your Forge application. It supports multiple connection topologies (Mesh, SFU, MCU), leverages the Streaming extension for signaling, and includes advanced features like quality monitoring, recording, and adaptive bitrate control.
Features
Multiple Topologies
- Mesh (P2P): Direct peer-to-peer connections, best for 2-4 participants
- SFU (Selective Forwarding Unit): Server-mediated routing, best for 5-50 participants
- MCU (Multipoint Control Unit): Server-mixed media, best for 50+ participants (planned)
Signaling & Communication
- Streaming Integration: Uses Streaming extension for WebSocket/SSE signaling
- Automatic Room Management: Seamless integration with streaming rooms
- Distributed Signaling: Multi-node signaling coordination
- ICE Candidate Exchange: Automatic NAT traversal with STUN/TURN support
Media Management
- Audio/Video Tracks: Full media track management with codec selection
- Screen Sharing: Desktop and application sharing capabilities
- Data Channels: Bidirectional data communication
- Simulcast Support: Multiple quality layers for adaptive streaming (SFU mode)
Quality & Performance
- Real-time Monitoring: Connection quality metrics and statistics
- Adaptive Quality: Dynamic quality adjustment based on network conditions
- Packet Loss Detection: Automatic detection and reporting of network issues
- Bitrate Control: Dynamic bitrate adjustment for optimal performance
Recording & Storage
- Stream Recording: Record audio/video streams in multiple formats
- Per-room Recording: Room-level recording management
- Multiple Formats: Support for WebM, MP4, and other formats
- Metadata Inclusion: Optional chat and metadata recording
Security & Authentication
- Integrated Authentication: Uses Streaming extension authentication
- Room-level Permissions: Fine-grained access control
- STUN/TURN Security: Secure NAT traversal with credential management
- TLS Support: End-to-end encryption for signaling and media
Installation
Go Module
go get github.com/xraph/forge/extensions/webrtcDocker
FROM xraph/forge:latest
# WebRTC extension is includedPackage Manager
# Using Forge CLI
forge extension add webrtc
# Using package manager
npm install @xraph/forge-webrtcConfiguration
Basic Configuration
extensions:
# Streaming extension (required dependency)
streaming:
backend: "local"
enable_rooms: true
require_auth: true
# WebRTC extension
webrtc:
# Signaling
signaling_enabled: true
signaling_timeout: 30 # seconds
# Topology
topology: "mesh" # "mesh", "sfu", "mcu"
# STUN servers for NAT traversal
stun_servers:
- "stun:stun.l.google.com:19302"
- "stun:stun1.l.google.com:19302"
# TURN servers for relaying
turn_servers:
- urls:
- "turn:turn.example.com:3478"
username: "turnuser"
credential: "turnpass"
tls_enabled: false
# Media configuration
media:
audio_enabled: true
audio_codecs: ["opus"]
video_enabled: true
video_codecs: ["VP8", "H264"]
screen_share_enabled: true
data_channels_enabled: true
max_audio_bitrate: 128 # kbps
max_video_bitrate: 2500 # kbps
min_video_bitrate: 150 # kbps
max_width: 1920
max_height: 1080
max_fps: 30
# Quality monitoring
quality:
monitor_enabled: true
monitor_interval: 5 # seconds
max_packet_loss: 5.0 # percentage
max_jitter: 30 # milliseconds
min_bitrate: 100 # kbps
adaptive_quality: true
quality_check_interval: 10 # seconds
# Recording
recording_enabled: true
recording_path: "./recordings"
# Security
require_auth: true
allow_guests: false
# Metrics
metrics_enabled: trueSFU Configuration
extensions:
webrtc:
topology: "sfu"
# SFU-specific settings
sfu:
worker_count: 4
max_bandwidth_mbps: 100
adaptive_bitrate: true
simulcast_enabled: true
# Quality layers for simulcast
quality_layers:
- rid: "f" # Full quality
max_width: 1920
max_height: 1080
max_fps: 30
bitrate: 2500
- rid: "h" # Half quality
max_width: 1280
max_height: 720
max_fps: 30
bitrate: 1200
- rid: "q" # Quarter quality
max_width: 640
max_height: 360
max_fps: 30
bitrate: 500
# Recording
recording_enabled: true
recording_format: "webm"Environment Variables
# Topology
FORGE_WEBRTC_TOPOLOGY=mesh
FORGE_WEBRTC_SIGNALING_ENABLED=true
FORGE_WEBRTC_SIGNALING_TIMEOUT=30
# STUN/TURN
FORGE_WEBRTC_STUN_SERVERS=stun:stun.l.google.com:19302
FORGE_WEBRTC_TURN_URLS=turn:turn.example.com:3478
FORGE_WEBRTC_TURN_USERNAME=turnuser
FORGE_WEBRTC_TURN_CREDENTIAL=turnpass
# Media
FORGE_WEBRTC_AUDIO_ENABLED=true
FORGE_WEBRTC_VIDEO_ENABLED=true
FORGE_WEBRTC_MAX_VIDEO_BITRATE=2500
FORGE_WEBRTC_MAX_AUDIO_BITRATE=128
# Quality
FORGE_WEBRTC_MONITOR_ENABLED=true
FORGE_WEBRTC_ADAPTIVE_QUALITY=true
FORGE_WEBRTC_MAX_PACKET_LOSS=5.0
# Recording
FORGE_WEBRTC_RECORDING_ENABLED=true
FORGE_WEBRTC_RECORDING_PATH=./recordings
# Security
FORGE_WEBRTC_REQUIRE_AUTH=true
FORGE_WEBRTC_ALLOW_GUESTS=falseProgrammatic Configuration
package main
import (
"time"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/streaming"
"github.com/xraph/forge/extensions/webrtc"
)
func main() {
app := forge.NewApp(forge.AppConfig{
Name: "video-call-app",
Version: "1.0.0",
})
// Register streaming extension (required dependency)
streamingExt := streaming.NewExtension(
streaming.WithLocalBackend(),
streaming.WithFeatures(true, true, true, true, true), // rooms, channels, presence, typing, history
streaming.WithAuthentication("jwt"),
)
app.RegisterExtension(streamingExt)
// Configure WebRTC extension
webrtcExt, err := webrtc.New(streamingExt, webrtc.Config{
// Topology
Topology: webrtc.TopologyMesh,
// STUN/TURN servers
STUNServers: []string{
"stun:stun.l.google.com:19302",
"stun:stun1.l.google.com:19302",
},
TURNServers: []webrtc.TURNConfig{{
URLs: []string{"turn:turn.example.com:3478"},
Username: "turnuser",
Credential: "turnpass",
TLSEnabled: true,
}},
// Media configuration
MediaConfig: webrtc.MediaConfig{
AudioEnabled: true,
AudioCodecs: []string{"opus"},
VideoEnabled: true,
VideoCodecs: []string{"VP8", "H264"},
ScreenShareEnabled: true,
DataChannelsEnabled: true,
MaxAudioBitrate: 128,
MaxVideoBitrate: 2500,
MinVideoBitrate: 150,
MaxWidth: 1920,
MaxHeight: 1080,
MaxFPS: 30,
},
// Quality monitoring
QualityConfig: webrtc.QualityConfig{
MonitorEnabled: true,
MonitorInterval: 5 * time.Second,
MaxPacketLoss: 5.0,
MaxJitter: 30 * time.Millisecond,
MinBitrate: 100,
AdaptiveQuality: true,
QualityCheckInterval: 10 * time.Second,
},
// Recording
RecordingEnabled: true,
RecordingPath: "./recordings",
// Security
RequireAuth: true,
AllowGuests: false,
// Metrics
MetricsEnabled: true,
})
if err != nil {
log.Fatal(err)
}
app.RegisterExtension(webrtcExt)
app.Start()
}Usage Examples
Basic Video Call
package main
import (
"context"
"net/http"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/streaming"
"github.com/xraph/forge/extensions/webrtc"
)
func main() {
app := forge.NewApp(forge.AppConfig{
Name: "video-call",
Version: "1.0.0",
})
// Register extensions
streamingExt := streaming.NewExtension(streaming.WithLocalBackend())
app.RegisterExtension(streamingExt)
webrtcExt, _ := webrtc.New(streamingExt, webrtc.DefaultConfig())
app.RegisterExtension(webrtcExt)
// Register WebRTC signaling routes
webrtcExt.RegisterRoutes(app.Router())
// Create call room
app.Router().POST("/call/create", func(ctx forge.Context) error {
var req struct {
RoomName string `json:"room_name" validate:"required"`
MaxMembers int `json:"max_members"`
}
if err := ctx.Bind(&req); err != nil {
return ctx.JSON(400, map[string]string{"error": "Invalid request"})
}
userID := ctx.Get("user_id").(string)
// Create call room
room, err := webrtcExt.CreateCallRoom(ctx.Request().Context(), "", streaming.RoomOptions{
Name: req.RoomName,
Owner: userID,
MaxMembers: req.MaxMembers,
})
if err != nil {
return ctx.JSON(500, map[string]string{"error": "Failed to create room"})
}
return ctx.JSON(201, map[string]any{
"room_id": room.ID(),
"room_name": room.Name(),
"status": "created",
})
})
// Join call
app.Router().POST("/call/:roomID/join", func(ctx forge.Context) error {
roomID := ctx.Param("roomID")
userID := ctx.Get("user_id").(string)
var req struct {
DisplayName string `json:"display_name"`
AudioEnabled bool `json:"audio_enabled"`
VideoEnabled bool `json:"video_enabled"`
}
if err := ctx.Bind(&req); err != nil {
return ctx.JSON(400, map[string]string{"error": "Invalid request"})
}
// Join call
peer, err := webrtcExt.JoinCall(ctx.Request().Context(), roomID, userID, &webrtc.JoinOptions{
DisplayName: req.DisplayName,
AudioEnabled: req.AudioEnabled,
VideoEnabled: req.VideoEnabled,
})
if err != nil {
return ctx.JSON(500, map[string]string{"error": "Failed to join call"})
}
return ctx.JSON(200, map[string]any{
"peer_id": peer.ID(),
"room_id": roomID,
"user_id": userID,
"joined": true,
})
})
// Get call info
app.Router().GET("/call/:roomID", func(ctx forge.Context) error {
roomID := ctx.Param("roomID")
room, err := webrtcExt.GetCallRoom(roomID)
if err != nil {
return ctx.JSON(404, map[string]string{"error": "Room not found"})
}
participants := room.GetParticipants()
quality, _ := room.GetQuality(ctx.Request().Context())
return ctx.JSON(200, map[string]any{
"room_id": room.ID(),
"room_name": room.Name(),
"participants": participants,
"quality": quality,
})
})
app.Start()
}Advanced Call Management
package main
import (
"context"
"time"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/streaming"
"github.com/xraph/forge/extensions/webrtc"
)
type CallService struct {
webrtc *webrtc.Extension
streaming *streaming.Extension
logger forge.Logger
}
func NewCallService(webrtcExt *webrtc.Extension, streamingExt *streaming.Extension, logger forge.Logger) *CallService {
return &CallService{
webrtc: webrtcExt,
streaming: streamingExt,
logger: logger,
}
}
func (s *CallService) CreateVideoConference(ctx context.Context, ownerID string, participants []string, options *ConferenceOptions) (*webrtc.CallRoom, error) {
// Create room with SFU topology for better scalability
room, err := s.webrtc.CreateCallRoom(ctx, "", streaming.RoomOptions{
Name: options.Name,
Owner: ownerID,
MaxMembers: len(participants) + 1,
Private: options.Private,
})
if err != nil {
return nil, err
}
// Send invitations to participants
for _, participantID := range participants {
s.sendCallInvitation(ctx, room, participantID, ownerID)
}
// Start recording if enabled
if options.RecordingEnabled {
recorder := s.webrtc.GetRecorder()
err := recorder.Start(ctx, room.ID(), &webrtc.RecordingOptions{
Format: "webm",
VideoCodec: "VP8",
AudioCodec: "opus",
OutputPath: fmt.Sprintf("./recordings/%s", room.ID()),
IncludeChat: true,
})
if err != nil {
s.logger.Error("failed to start recording",
forge.F("room_id", room.ID()),
forge.F("error", err),
)
}
}
return room, nil
}
func (s *CallService) sendCallInvitation(ctx context.Context, room webrtc.CallRoom, participantID, inviterID string) {
invitation := &streaming.Message{
Type: "call_invitation",
UserID: "system",
Data: map[string]any{
"room_id": room.ID(),
"room_name": room.Name(),
"invited_by": inviterID,
"join_url": fmt.Sprintf("/call/%s", room.ID()),
"timestamp": time.Now(),
},
}
// Send via streaming extension
manager := s.streaming.GetManager()
manager.SendToUser(ctx, participantID, invitation)
}
func (s *CallService) HandleScreenShare(ctx context.Context, roomID, userID string, track webrtc.MediaTrack) error {
room, err := s.webrtc.GetCallRoom(roomID)
if err != nil {
return err
}
// Start screen sharing
if err := room.StartScreenShare(ctx, userID, track); err != nil {
return err
}
// Notify other participants
manager := s.streaming.GetManager()
manager.BroadcastToRoom(ctx, roomID, &streaming.Message{
Type: "screen_share_started",
Data: map[string]any{
"user_id": userID,
"room_id": roomID,
"track_id": track.ID(),
"timestamp": time.Now(),
},
})
return nil
}
func (s *CallService) MonitorCallQuality(ctx context.Context, roomID string) {
room, err := s.webrtc.GetCallRoom(roomID)
if err != nil {
return
}
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
quality, err := room.GetQuality(ctx)
if err != nil {
continue
}
// Check for quality issues
if quality.AverageQuality < 50 {
s.handlePoorQuality(ctx, room, quality)
}
// Log quality metrics
s.logger.Info("call quality update",
forge.F("room_id", roomID),
forge.F("participants", quality.ParticipantCount),
forge.F("avg_quality", quality.AverageQuality),
forge.F("packet_loss", quality.PacketLoss),
forge.F("latency", quality.Latency),
)
}
}
}
func (s *CallService) handlePoorQuality(ctx context.Context, room webrtc.CallRoom, quality *webrtc.CallQuality) {
// Notify participants about quality issues
manager := s.streaming.GetManager()
manager.BroadcastToRoom(ctx, room.ID(), &streaming.Message{
Type: "quality_warning",
Data: map[string]any{
"room_id": room.ID(),
"quality_score": quality.AverageQuality,
"packet_loss": quality.PacketLoss,
"suggestions": []string{
"Check your internet connection",
"Close other applications using bandwidth",
"Move closer to your WiFi router",
},
},
})
// Implement adaptive quality measures
for participantID, participantQuality := range quality.Participants {
if participantQuality.Score < 30 {
// Suggest reducing video quality
peer, err := room.GetPeer(participantID)
if err != nil {
continue
}
// This would trigger client-side quality reduction
s.suggestQualityReduction(ctx, peer, participantQuality)
}
}
}
type ConferenceOptions struct {
Name string
Private bool
RecordingEnabled bool
MaxDuration time.Duration
}SFU Implementation
package main
import (
"context"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/streaming"
"github.com/xraph/forge/extensions/webrtc"
)
func main() {
app := forge.NewApp(forge.AppConfig{
Name: "sfu-server",
Version: "1.0.0",
})
// Register streaming with Redis backend for distributed signaling
streamingExt := streaming.NewExtension(
streaming.WithRedisBackend("redis://localhost:6379"),
streaming.WithDistributed(true),
streaming.WithNodeID("sfu-node-1"),
)
app.RegisterExtension(streamingExt)
// Configure WebRTC with SFU topology
webrtcExt, _ := webrtc.New(streamingExt, webrtc.Config{
Topology: webrtc.TopologySFU,
STUNServers: []string{
"stun:stun.l.google.com:19302",
},
TURNServers: []webrtc.TURNConfig{{
URLs: []string{"turn:turn.example.com:3478"},
Username: "turnuser",
Credential: "turnpass",
}},
SFUConfig: &webrtc.SFUConfig{
WorkerCount: 8,
MaxBandwidthMbps: 1000,
AdaptiveBitrate: true,
SimulcastEnabled: true,
QualityLayers: []webrtc.QualityLayer{
{RID: "f", MaxWidth: 1920, MaxHeight: 1080, MaxFPS: 30, Bitrate: 2500},
{RID: "h", MaxWidth: 1280, MaxHeight: 720, MaxFPS: 30, Bitrate: 1200},
{RID: "q", MaxWidth: 640, MaxHeight: 360, MaxFPS: 30, Bitrate: 500},
},
RecordingEnabled: true,
RecordingFormat: "webm",
},
MediaConfig: webrtc.MediaConfig{
AudioEnabled: true,
AudioCodecs: []string{"opus"},
VideoEnabled: true,
VideoCodecs: []string{"VP8", "H264"},
DataChannelsEnabled: true,
MaxVideoBitrate: 2500,
MinVideoBitrate: 150,
},
QualityConfig: webrtc.QualityConfig{
MonitorEnabled: true,
AdaptiveQuality: true,
QualityCheckInterval: 5 * time.Second,
},
})
app.RegisterExtension(webrtcExt)
// Register routes
webrtcExt.RegisterRoutes(app.Router())
// Large conference endpoint
app.Router().POST("/conference/create", func(ctx forge.Context) error {
var req struct {
Name string `json:"name" validate:"required"`
MaxMembers int `json:"max_members"`
Participants []string `json:"participants"`
}
if err := ctx.Bind(&req); err != nil {
return ctx.JSON(400, map[string]string{"error": "Invalid request"})
}
ownerID := ctx.Get("user_id").(string)
// Create large conference room
room, err := webrtcExt.CreateCallRoom(ctx.Request().Context(), "", streaming.RoomOptions{
Name: req.Name,
Owner: ownerID,
MaxMembers: req.MaxMembers,
})
if err != nil {
return ctx.JSON(500, map[string]string{"error": "Failed to create conference"})
}
// Start recording for large conferences
recorder := webrtcExt.GetRecorder()
recorder.Start(ctx.Request().Context(), room.ID(), &webrtc.RecordingOptions{
Format: "webm",
VideoCodec: "VP8",
AudioCodec: "opus",
OutputPath: fmt.Sprintf("./recordings/conference_%s", room.ID()),
IncludeChat: true,
})
return ctx.JSON(201, map[string]any{
"room_id": room.ID(),
"room_name": room.Name(),
"topology": "sfu",
"max_members": req.MaxMembers,
})
})
app.Start()
}Client-Side Integration
JavaScript WebRTC Client
class ForgeWebRTCClient {
constructor(serverUrl, roomId, userId) {
this.serverUrl = serverUrl;
this.roomId = roomId;
this.userId = userId;
this.ws = null;
this.pc = null;
this.localStream = null;
this.remoteStreams = new Map();
this.setupPeerConnection();
}
setupPeerConnection() {
this.pc = new RTCPeerConnection({
iceServers: [
{ urls: 'stun:stun.l.google.com:19302' },
{
urls: 'turn:turn.example.com:3478',
username: 'turnuser',
credential: 'turnpass'
}
]
});
// Handle ICE candidates
this.pc.onicecandidate = (event) => {
if (event.candidate) {
this.sendSignalingMessage({
type: 'webrtc.ice_candidate',
candidate: event.candidate
});
}
};
// Handle remote tracks
this.pc.ontrack = (event) => {
const [remoteStream] = event.streams;
const trackId = event.track.id;
this.remoteStreams.set(trackId, remoteStream);
this.onRemoteTrack(trackId, remoteStream, event.track);
};
// Handle connection state changes
this.pc.onconnectionstatechange = () => {
console.log('Connection state:', this.pc.connectionState);
this.onConnectionStateChange(this.pc.connectionState);
};
}
async connect() {
// Connect to signaling server
this.ws = new WebSocket(`${this.serverUrl}/webrtc/signal/${this.roomId}`);
this.ws.onopen = () => {
console.log('Connected to signaling server');
this.onSignalingConnected();
};
this.ws.onmessage = async (event) => {
const message = JSON.parse(event.data);
await this.handleSignalingMessage(message);
};
this.ws.onclose = () => {
console.log('Disconnected from signaling server');
this.onSignalingDisconnected();
};
this.ws.onerror = (error) => {
console.error('Signaling error:', error);
this.onSignalingError(error);
};
}
async handleSignalingMessage(message) {
switch (message.type) {
case 'webrtc.offer':
await this.handleOffer(message.sdp);
break;
case 'webrtc.answer':
await this.handleAnswer(message.sdp);
break;
case 'webrtc.ice_candidate':
await this.handleICECandidate(message.candidate);
break;
case 'peer_joined':
await this.handlePeerJoined(message.peer_id);
break;
case 'peer_left':
this.handlePeerLeft(message.peer_id);
break;
}
}
async handleOffer(offer) {
await this.pc.setRemoteDescription(offer);
const answer = await this.pc.createAnswer();
await this.pc.setLocalDescription(answer);
this.sendSignalingMessage({
type: 'webrtc.answer',
sdp: answer
});
}
async handleAnswer(answer) {
await this.pc.setRemoteDescription(answer);
}
async handleICECandidate(candidate) {
await this.pc.addIceCandidate(candidate);
}
async handlePeerJoined(peerId) {
// Create offer for new peer
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
this.sendSignalingMessage({
type: 'webrtc.offer',
target_peer: peerId,
sdp: offer
});
}
handlePeerLeft(peerId) {
// Clean up peer resources
this.remoteStreams.delete(peerId);
this.onPeerLeft(peerId);
}
async startLocalMedia(constraints = { audio: true, video: true }) {
try {
this.localStream = await navigator.mediaDevices.getUserMedia(constraints);
// Add tracks to peer connection
this.localStream.getTracks().forEach(track => {
this.pc.addTrack(track, this.localStream);
});
this.onLocalMediaStarted(this.localStream);
return this.localStream;
} catch (error) {
console.error('Failed to start local media:', error);
this.onLocalMediaError(error);
throw error;
}
}
async startScreenShare() {
try {
const screenStream = await navigator.mediaDevices.getDisplayMedia({
video: true,
audio: true
});
// Replace video track
const videoTrack = screenStream.getVideoTracks()[0];
const sender = this.pc.getSenders().find(s =>
s.track && s.track.kind === 'video'
);
if (sender) {
await sender.replaceTrack(videoTrack);
}
// Handle screen share end
videoTrack.onended = () => {
this.stopScreenShare();
};
this.onScreenShareStarted(screenStream);
return screenStream;
} catch (error) {
console.error('Failed to start screen share:', error);
this.onScreenShareError(error);
throw error;
}
}
async stopScreenShare() {
// Replace with camera video
if (this.localStream) {
const videoTrack = this.localStream.getVideoTracks()[0];
const sender = this.pc.getSenders().find(s =>
s.track && s.track.kind === 'video'
);
if (sender && videoTrack) {
await sender.replaceTrack(videoTrack);
}
}
this.onScreenShareStopped();
}
sendSignalingMessage(message) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
}
}
muteAudio() {
if (this.localStream) {
this.localStream.getAudioTracks().forEach(track => {
track.enabled = false;
});
}
}
unmuteAudio() {
if (this.localStream) {
this.localStream.getAudioTracks().forEach(track => {
track.enabled = true;
});
}
}
muteVideo() {
if (this.localStream) {
this.localStream.getVideoTracks().forEach(track => {
track.enabled = false;
});
}
}
unmuteVideo() {
if (this.localStream) {
this.localStream.getVideoTracks().forEach(track => {
track.enabled = true;
});
}
}
disconnect() {
if (this.localStream) {
this.localStream.getTracks().forEach(track => track.stop());
}
if (this.pc) {
this.pc.close();
}
if (this.ws) {
this.ws.close();
}
}
// Event handlers (override these)
onSignalingConnected() {}
onSignalingDisconnected() {}
onSignalingError(error) {}
onLocalMediaStarted(stream) {}
onLocalMediaError(error) {}
onRemoteTrack(trackId, stream, track) {}
onPeerLeft(peerId) {}
onConnectionStateChange(state) {}
onScreenShareStarted(stream) {}
onScreenShareStopped() {}
onScreenShareError(error) {}
}
// Usage example
const client = new ForgeWebRTCClient('wss://example.com', 'room-123', 'user-456');
client.onLocalMediaStarted = (stream) => {
document.getElementById('localVideo').srcObject = stream;
};
client.onRemoteTrack = (trackId, stream, track) => {
const remoteVideo = document.createElement('video');
remoteVideo.srcObject = stream;
remoteVideo.autoplay = true;
document.getElementById('remoteVideos').appendChild(remoteVideo);
};
// Connect and start media
await client.connect();
await client.startLocalMedia();Advanced Features
Quality Monitoring & Adaptive Streaming
package main
import (
"context"
"time"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/webrtc"
)
type QualityManager struct {
webrtc *webrtc.Extension
monitor webrtc.QualityMonitor
logger forge.Logger
}
func NewQualityManager(webrtcExt *webrtc.Extension, logger forge.Logger) *QualityManager {
return &QualityManager{
webrtc: webrtcExt,
monitor: webrtcExt.GetQualityMonitor(),
logger: logger,
}
}
func (qm *QualityManager) StartMonitoring(ctx context.Context, roomID string) {
room, err := qm.webrtc.GetCallRoom(roomID)
if err != nil {
return
}
// Monitor all peers in the room
peers := room.GetPeers()
for _, peer := range peers {
go qm.monitorPeer(ctx, peer)
}
// Set up quality change handler
qm.monitor.OnQualityChange(func(peerID string, quality *webrtc.ConnectionQuality) {
qm.handleQualityChange(ctx, roomID, peerID, quality)
})
}
func (qm *QualityManager) monitorPeer(ctx context.Context, peer webrtc.PeerConnection) {
if err := qm.monitor.Monitor(ctx, peer); err != nil {
qm.logger.Error("failed to start monitoring peer",
forge.F("peer_id", peer.ID()),
forge.F("error", err),
)
}
}
func (qm *QualityManager) handleQualityChange(ctx context.Context, roomID, peerID string, quality *webrtc.ConnectionQuality) {
qm.logger.Info("quality change detected",
forge.F("room_id", roomID),
forge.F("peer_id", peerID),
forge.F("score", quality.Score),
forge.F("packet_loss", quality.PacketLoss),
forge.F("jitter", quality.Jitter),
forge.F("latency", quality.Latency),
)
// Implement adaptive quality measures
if quality.Score < 30 {
qm.handlePoorQuality(ctx, roomID, peerID, quality)
} else if quality.Score > 80 {
qm.handleGoodQuality(ctx, roomID, peerID, quality)
}
}
func (qm *QualityManager) handlePoorQuality(ctx context.Context, roomID, peerID string, quality *webrtc.ConnectionQuality) {
// Get SFU router for quality adjustment
if qm.webrtc.GetConfig().Topology == webrtc.TopologySFU {
router := qm.webrtc.GetSFURouter()
// Reduce quality for this peer
router.SetQuality(ctx, peerID, peerID, "q") // Switch to quarter quality
qm.logger.Info("reduced quality for poor connection",
forge.F("peer_id", peerID),
forge.F("new_quality", "quarter"),
)
}
}
func (qm *QualityManager) handleGoodQuality(ctx context.Context, roomID, peerID string, quality *webrtc.ConnectionQuality) {
// Increase quality if connection is good
if qm.webrtc.GetConfig().Topology == webrtc.TopologySFU {
router := qm.webrtc.GetSFURouter()
// Increase to full quality
router.SetQuality(ctx, peerID, peerID, "f") // Switch to full quality
qm.logger.Info("increased quality for good connection",
forge.F("peer_id", peerID),
forge.F("new_quality", "full"),
)
}
}Recording Management
package main
import (
"context"
"fmt"
"path/filepath"
"time"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/webrtc"
)
type RecordingManager struct {
webrtc *webrtc.Extension
recorder webrtc.Recorder
logger forge.Logger
recordings map[string]*RecordingSession
mu sync.RWMutex
}
type RecordingSession struct {
RoomID string
StartedAt time.Time
Options *webrtc.RecordingOptions
Status *webrtc.RecordingStatus
}
func NewRecordingManager(webrtcExt *webrtc.Extension, logger forge.Logger) *RecordingManager {
return &RecordingManager{
webrtc: webrtcExt,
recorder: webrtcExt.GetRecorder(),
logger: logger,
recordings: make(map[string]*RecordingSession),
}
}
func (rm *RecordingManager) StartRecording(ctx context.Context, roomID string, options *webrtc.RecordingOptions) error {
rm.mu.Lock()
defer rm.mu.Unlock()
// Check if already recording
if _, exists := rm.recordings[roomID]; exists {
return fmt.Errorf("recording already in progress for room %s", roomID)
}
// Set default options if not provided
if options == nil {
options = &webrtc.RecordingOptions{
Format: "webm",
VideoCodec: "VP8",
AudioCodec: "opus",
OutputPath: filepath.Join("./recordings", fmt.Sprintf("%s_%d", roomID, time.Now().Unix())),
IncludeChat: true,
}
}
// Start recording
if err := rm.recorder.Start(ctx, roomID, options); err != nil {
return fmt.Errorf("failed to start recording: %w", err)
}
// Track recording session
session := &RecordingSession{
RoomID: roomID,
StartedAt: time.Now(),
Options: options,
}
rm.recordings[roomID] = session
rm.logger.Info("recording started",
forge.F("room_id", roomID),
forge.F("output_path", options.OutputPath),
forge.F("format", options.Format),
)
return nil
}
func (rm *RecordingManager) StopRecording(ctx context.Context, roomID string) error {
rm.mu.Lock()
defer rm.mu.Unlock()
session, exists := rm.recordings[roomID]
if !exists {
return fmt.Errorf("no recording in progress for room %s", roomID)
}
// Stop recording
if err := rm.recorder.Stop(ctx, roomID); err != nil {
return fmt.Errorf("failed to stop recording: %w", err)
}
// Get final status
status, err := rm.recorder.GetStatus(roomID)
if err != nil {
rm.logger.Error("failed to get recording status",
forge.F("room_id", roomID),
forge.F("error", err),
)
} else {
session.Status = status
}
duration := time.Since(session.StartedAt)
rm.logger.Info("recording stopped",
forge.F("room_id", roomID),
forge.F("duration", duration),
forge.F("file_size", status.FileSize),
forge.F("output_path", status.OutputPath),
)
// Clean up
delete(rm.recordings, roomID)
return nil
}
func (rm *RecordingManager) GetRecordingStatus(roomID string) (*webrtc.RecordingStatus, error) {
rm.mu.RLock()
defer rm.mu.RUnlock()
session, exists := rm.recordings[roomID]
if !exists {
return nil, fmt.Errorf("no recording in progress for room %s", roomID)
}
// Get current status from recorder
status, err := rm.recorder.GetStatus(roomID)
if err != nil {
return nil, err
}
session.Status = status
return status, nil
}
func (rm *RecordingManager) PauseRecording(ctx context.Context, roomID string) error {
if err := rm.recorder.Pause(ctx, roomID); err != nil {
return fmt.Errorf("failed to pause recording: %w", err)
}
rm.logger.Info("recording paused", forge.F("room_id", roomID))
return nil
}
func (rm *RecordingManager) ResumeRecording(ctx context.Context, roomID string) error {
if err := rm.recorder.Resume(ctx, roomID); err != nil {
return fmt.Errorf("failed to resume recording: %w", err)
}
rm.logger.Info("recording resumed", forge.F("room_id", roomID))
return nil
}
func (rm *RecordingManager) ListActiveRecordings() []*RecordingSession {
rm.mu.RLock()
defer rm.mu.RUnlock()
sessions := make([]*RecordingSession, 0, len(rm.recordings))
for _, session := range rm.recordings {
sessions = append(sessions, session)
}
return sessions
}Best Practices
Connection Management
- Use appropriate topology: Mesh for small groups (2-4), SFU for medium groups (5-50), MCU for large groups (50+)
- Implement reconnection logic: Handle network interruptions gracefully
- Monitor connection quality: Use quality monitoring to detect and respond to network issues
- Optimize ICE configuration: Use both STUN and TURN servers for reliable connectivity
Media Optimization
- Enable simulcast: Use multiple quality layers for adaptive streaming
- Implement adaptive bitrate: Adjust quality based on network conditions
- Use appropriate codecs: Choose codecs based on device capabilities and bandwidth
- Handle track management: Properly add, remove, and replace media tracks
Performance Considerations
- Limit concurrent connections: Set reasonable limits based on server capacity
- Use efficient signaling: Minimize signaling overhead with batching and compression
- Monitor resource usage: Track CPU, memory, and bandwidth usage
- Implement load balancing: Distribute load across multiple nodes for scalability
Security Best Practices
- Always authenticate: Require authentication for all WebRTC operations
- Use secure signaling: Implement TLS for signaling connections
- Validate permissions: Check room permissions before allowing operations
- Monitor for abuse: Implement rate limiting and abuse detection
Troubleshooting
Common Issues
Connection Failed
Error: WebRTC connection failed to establishSolutions:
- Check STUN/TURN server configuration
- Verify firewall and NAT settings
- Ensure proper ICE candidate exchange
- Check network connectivity between peers
Signaling Issues
Error: Signaling connection lostSolutions:
- Verify streaming extension is running
- Check WebSocket endpoint configuration
- Ensure proper authentication
- Monitor signaling server connectivity
Media Track Issues
Error: Failed to add media trackSolutions:
- Check media permissions in browser
- Verify codec compatibility
- Ensure proper track constraints
- Check device availability
Quality Problems
Warning: Poor connection quality detectedSolutions:
- Enable adaptive quality
- Check network bandwidth
- Reduce video resolution/bitrate
- Use quality monitoring for optimization
Debugging
Enable Debug Logging
logging:
level: debug
loggers:
webrtc: debug
webrtc.signaling: debug
webrtc.peer: debug
webrtc.quality: debugMonitor WebRTC Statistics
// Get peer connection statistics
peer, err := room.GetPeer(userID)
if err != nil {
return err
}
stats, err := peer.GetStats(ctx)
if err != nil {
return err
}
fmt.Printf("Bytes Sent: %d\n", stats.BytesSent)
fmt.Printf("Bytes Received: %d\n", stats.BytesReceived)
fmt.Printf("Packet Loss: %d\n", stats.PacketsLost)
fmt.Printf("RTT: %v\n", stats.RoundTripTime)Health Checks
# Check WebRTC extension health
curl http://localhost:8080/_/health/webrtc
# Check signaling connectivity
curl http://localhost:8080/_/health/webrtc/signaling
# Check TURN server connectivity
curl http://localhost:8080/_/health/webrtc/turnNext Steps
- Explore Streaming Extension for signaling and room management
- Learn about Events Extension for event-driven architecture
- Check out gRPC Extension for high-performance RPC
- Review GraphQL Extension for flexible query APIs
- See oRPC Extension for JSON-RPC 2.0 support
- Visit Auth Extension for authentication integration
How is this guide?
Last updated on