WebRTC Extension

Real-time peer-to-peer communication with WebRTC support, multiple topologies, and quality monitoring

WebRTC Extension

The WebRTC extension provides production-ready real-time peer-to-peer communication capabilities for your Forge application. It supports multiple connection topologies (Mesh, SFU, MCU), leverages the Streaming extension for signaling, and includes advanced features like quality monitoring, recording, and adaptive bitrate control.

Features

Multiple Topologies

  • Mesh (P2P): Direct peer-to-peer connections, best for 2-4 participants
  • SFU (Selective Forwarding Unit): Server-mediated routing, best for 5-50 participants
  • MCU (Multipoint Control Unit): Server-mixed media, best for 50+ participants (planned)

Signaling & Communication

  • Streaming Integration: Uses Streaming extension for WebSocket/SSE signaling
  • Automatic Room Management: Seamless integration with streaming rooms
  • Distributed Signaling: Multi-node signaling coordination
  • ICE Candidate Exchange: Automatic NAT traversal with STUN/TURN support

Media Management

  • Audio/Video Tracks: Full media track management with codec selection
  • Screen Sharing: Desktop and application sharing capabilities
  • Data Channels: Bidirectional data communication
  • Simulcast Support: Multiple quality layers for adaptive streaming (SFU mode)

Quality & Performance

  • Real-time Monitoring: Connection quality metrics and statistics
  • Adaptive Quality: Dynamic quality adjustment based on network conditions
  • Packet Loss Detection: Automatic detection and reporting of network issues
  • Bitrate Control: Dynamic bitrate adjustment for optimal performance

Recording & Storage

  • Stream Recording: Record audio/video streams in multiple formats
  • Per-room Recording: Room-level recording management
  • Multiple Formats: Support for WebM, MP4, and other formats
  • Metadata Inclusion: Optional chat and metadata recording

Security & Authentication

  • Integrated Authentication: Uses Streaming extension authentication
  • Room-level Permissions: Fine-grained access control
  • STUN/TURN Security: Secure NAT traversal with credential management
  • TLS Support: End-to-end encryption for signaling and media

Installation

Go Module

go get github.com/xraph/forge/extensions/webrtc

Docker

FROM xraph/forge:latest
# WebRTC extension is included

Package Manager

# Using Forge CLI
forge extension add webrtc

# Using package manager
npm install @xraph/forge-webrtc

Configuration

Basic Configuration

extensions:
  # Streaming extension (required dependency)
  streaming:
    backend: "local"
    enable_rooms: true
    require_auth: true
    
  # WebRTC extension
  webrtc:
    # Signaling
    signaling_enabled: true
    signaling_timeout: 30                # seconds
    
    # Topology
    topology: "mesh"                     # "mesh", "sfu", "mcu"
    
    # STUN servers for NAT traversal
    stun_servers:
      - "stun:stun.l.google.com:19302"
      - "stun:stun1.l.google.com:19302"
    
    # TURN servers for relaying
    turn_servers:
      - urls:
          - "turn:turn.example.com:3478"
        username: "turnuser"
        credential: "turnpass"
        tls_enabled: false
    
    # Media configuration
    media:
      audio_enabled: true
      audio_codecs: ["opus"]
      video_enabled: true
      video_codecs: ["VP8", "H264"]
      screen_share_enabled: true
      data_channels_enabled: true
      max_audio_bitrate: 128             # kbps
      max_video_bitrate: 2500            # kbps
      min_video_bitrate: 150             # kbps
      max_width: 1920
      max_height: 1080
      max_fps: 30
    
    # Quality monitoring
    quality:
      monitor_enabled: true
      monitor_interval: 5                # seconds
      max_packet_loss: 5.0               # percentage
      max_jitter: 30                     # milliseconds
      min_bitrate: 100                   # kbps
      adaptive_quality: true
      quality_check_interval: 10         # seconds
    
    # Recording
    recording_enabled: true
    recording_path: "./recordings"
    
    # Security
    require_auth: true
    allow_guests: false
    
    # Metrics
    metrics_enabled: true

SFU Configuration

extensions:
  webrtc:
    topology: "sfu"
    
    # SFU-specific settings
    sfu:
      worker_count: 4
      max_bandwidth_mbps: 100
      adaptive_bitrate: true
      simulcast_enabled: true
      
      # Quality layers for simulcast
      quality_layers:
        - rid: "f"                       # Full quality
          max_width: 1920
          max_height: 1080
          max_fps: 30
          bitrate: 2500
        - rid: "h"                       # Half quality
          max_width: 1280
          max_height: 720
          max_fps: 30
          bitrate: 1200
        - rid: "q"                       # Quarter quality
          max_width: 640
          max_height: 360
          max_fps: 30
          bitrate: 500
      
      # Recording
      recording_enabled: true
      recording_format: "webm"

Environment Variables

# Topology
FORGE_WEBRTC_TOPOLOGY=mesh
FORGE_WEBRTC_SIGNALING_ENABLED=true
FORGE_WEBRTC_SIGNALING_TIMEOUT=30

# STUN/TURN
FORGE_WEBRTC_STUN_SERVERS=stun:stun.l.google.com:19302
FORGE_WEBRTC_TURN_URLS=turn:turn.example.com:3478
FORGE_WEBRTC_TURN_USERNAME=turnuser
FORGE_WEBRTC_TURN_CREDENTIAL=turnpass

# Media
FORGE_WEBRTC_AUDIO_ENABLED=true
FORGE_WEBRTC_VIDEO_ENABLED=true
FORGE_WEBRTC_MAX_VIDEO_BITRATE=2500
FORGE_WEBRTC_MAX_AUDIO_BITRATE=128

# Quality
FORGE_WEBRTC_MONITOR_ENABLED=true
FORGE_WEBRTC_ADAPTIVE_QUALITY=true
FORGE_WEBRTC_MAX_PACKET_LOSS=5.0

# Recording
FORGE_WEBRTC_RECORDING_ENABLED=true
FORGE_WEBRTC_RECORDING_PATH=./recordings

# Security
FORGE_WEBRTC_REQUIRE_AUTH=true
FORGE_WEBRTC_ALLOW_GUESTS=false

Programmatic Configuration

package main

import (
    "time"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
    "github.com/xraph/forge/extensions/webrtc"
)

func main() {
    app := forge.NewApp(forge.AppConfig{
        Name:    "video-call-app",
        Version: "1.0.0",
    })

    // Register streaming extension (required dependency)
    streamingExt := streaming.NewExtension(
        streaming.WithLocalBackend(),
        streaming.WithFeatures(true, true, true, true, true), // rooms, channels, presence, typing, history
        streaming.WithAuthentication("jwt"),
    )
    app.RegisterExtension(streamingExt)

    // Configure WebRTC extension
    webrtcExt, err := webrtc.New(streamingExt, webrtc.Config{
        // Topology
        Topology: webrtc.TopologyMesh,
        
        // STUN/TURN servers
        STUNServers: []string{
            "stun:stun.l.google.com:19302",
            "stun:stun1.l.google.com:19302",
        },
        TURNServers: []webrtc.TURNConfig{{
            URLs:       []string{"turn:turn.example.com:3478"},
            Username:   "turnuser",
            Credential: "turnpass",
            TLSEnabled: true,
        }},
        
        // Media configuration
        MediaConfig: webrtc.MediaConfig{
            AudioEnabled:        true,
            AudioCodecs:         []string{"opus"},
            VideoEnabled:        true,
            VideoCodecs:         []string{"VP8", "H264"},
            ScreenShareEnabled:  true,
            DataChannelsEnabled: true,
            MaxAudioBitrate:     128,
            MaxVideoBitrate:     2500,
            MinVideoBitrate:     150,
            MaxWidth:            1920,
            MaxHeight:           1080,
            MaxFPS:              30,
        },
        
        // Quality monitoring
        QualityConfig: webrtc.QualityConfig{
            MonitorEnabled:       true,
            MonitorInterval:      5 * time.Second,
            MaxPacketLoss:        5.0,
            MaxJitter:            30 * time.Millisecond,
            MinBitrate:           100,
            AdaptiveQuality:      true,
            QualityCheckInterval: 10 * time.Second,
        },
        
        // Recording
        RecordingEnabled: true,
        RecordingPath:    "./recordings",
        
        // Security
        RequireAuth: true,
        AllowGuests: false,
        
        // Metrics
        MetricsEnabled: true,
    })
    
    if err != nil {
        log.Fatal(err)
    }
    
    app.RegisterExtension(webrtcExt)
    app.Start()
}

Usage Examples

Basic Video Call

package main

import (
    "context"
    "net/http"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
    "github.com/xraph/forge/extensions/webrtc"
)

func main() {
    app := forge.NewApp(forge.AppConfig{
        Name:    "video-call",
        Version: "1.0.0",
    })

    // Register extensions
    streamingExt := streaming.NewExtension(streaming.WithLocalBackend())
    app.RegisterExtension(streamingExt)
    
    webrtcExt, _ := webrtc.New(streamingExt, webrtc.DefaultConfig())
    app.RegisterExtension(webrtcExt)

    // Register WebRTC signaling routes
    webrtcExt.RegisterRoutes(app.Router())

    // Create call room
    app.Router().POST("/call/create", func(ctx forge.Context) error {
        var req struct {
            RoomName   string `json:"room_name" validate:"required"`
            MaxMembers int    `json:"max_members"`
        }
        
        if err := ctx.Bind(&req); err != nil {
            return ctx.JSON(400, map[string]string{"error": "Invalid request"})
        }
        
        userID := ctx.Get("user_id").(string)
        
        // Create call room
        room, err := webrtcExt.CreateCallRoom(ctx.Request().Context(), "", streaming.RoomOptions{
            Name:       req.RoomName,
            Owner:      userID,
            MaxMembers: req.MaxMembers,
        })
        if err != nil {
            return ctx.JSON(500, map[string]string{"error": "Failed to create room"})
        }
        
        return ctx.JSON(201, map[string]any{
            "room_id":   room.ID(),
            "room_name": room.Name(),
            "status":    "created",
        })
    })

    // Join call
    app.Router().POST("/call/:roomID/join", func(ctx forge.Context) error {
        roomID := ctx.Param("roomID")
        userID := ctx.Get("user_id").(string)
        
        var req struct {
            DisplayName  string `json:"display_name"`
            AudioEnabled bool   `json:"audio_enabled"`
            VideoEnabled bool   `json:"video_enabled"`
        }
        
        if err := ctx.Bind(&req); err != nil {
            return ctx.JSON(400, map[string]string{"error": "Invalid request"})
        }
        
        // Join call
        peer, err := webrtcExt.JoinCall(ctx.Request().Context(), roomID, userID, &webrtc.JoinOptions{
            DisplayName:  req.DisplayName,
            AudioEnabled: req.AudioEnabled,
            VideoEnabled: req.VideoEnabled,
        })
        if err != nil {
            return ctx.JSON(500, map[string]string{"error": "Failed to join call"})
        }
        
        return ctx.JSON(200, map[string]any{
            "peer_id":     peer.ID(),
            "room_id":     roomID,
            "user_id":     userID,
            "joined":      true,
        })
    })

    // Get call info
    app.Router().GET("/call/:roomID", func(ctx forge.Context) error {
        roomID := ctx.Param("roomID")
        
        room, err := webrtcExt.GetCallRoom(roomID)
        if err != nil {
            return ctx.JSON(404, map[string]string{"error": "Room not found"})
        }
        
        participants := room.GetParticipants()
        quality, _ := room.GetQuality(ctx.Request().Context())
        
        return ctx.JSON(200, map[string]any{
            "room_id":      room.ID(),
            "room_name":    room.Name(),
            "participants": participants,
            "quality":      quality,
        })
    })

    app.Start()
}

Advanced Call Management

package main

import (
    "context"
    "time"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
    "github.com/xraph/forge/extensions/webrtc"
)

type CallService struct {
    webrtc    *webrtc.Extension
    streaming *streaming.Extension
    logger    forge.Logger
}

func NewCallService(webrtcExt *webrtc.Extension, streamingExt *streaming.Extension, logger forge.Logger) *CallService {
    return &CallService{
        webrtc:    webrtcExt,
        streaming: streamingExt,
        logger:    logger,
    }
}

func (s *CallService) CreateVideoConference(ctx context.Context, ownerID string, participants []string, options *ConferenceOptions) (*webrtc.CallRoom, error) {
    // Create room with SFU topology for better scalability
    room, err := s.webrtc.CreateCallRoom(ctx, "", streaming.RoomOptions{
        Name:       options.Name,
        Owner:      ownerID,
        MaxMembers: len(participants) + 1,
        Private:    options.Private,
    })
    if err != nil {
        return nil, err
    }
    
    // Send invitations to participants
    for _, participantID := range participants {
        s.sendCallInvitation(ctx, room, participantID, ownerID)
    }
    
    // Start recording if enabled
    if options.RecordingEnabled {
        recorder := s.webrtc.GetRecorder()
        err := recorder.Start(ctx, room.ID(), &webrtc.RecordingOptions{
            Format:      "webm",
            VideoCodec:  "VP8",
            AudioCodec:  "opus",
            OutputPath:  fmt.Sprintf("./recordings/%s", room.ID()),
            IncludeChat: true,
        })
        if err != nil {
            s.logger.Error("failed to start recording",
                forge.F("room_id", room.ID()),
                forge.F("error", err),
            )
        }
    }
    
    return room, nil
}

func (s *CallService) sendCallInvitation(ctx context.Context, room webrtc.CallRoom, participantID, inviterID string) {
    invitation := &streaming.Message{
        Type:   "call_invitation",
        UserID: "system",
        Data: map[string]any{
            "room_id":     room.ID(),
            "room_name":   room.Name(),
            "invited_by":  inviterID,
            "join_url":    fmt.Sprintf("/call/%s", room.ID()),
            "timestamp":   time.Now(),
        },
    }
    
    // Send via streaming extension
    manager := s.streaming.GetManager()
    manager.SendToUser(ctx, participantID, invitation)
}

func (s *CallService) HandleScreenShare(ctx context.Context, roomID, userID string, track webrtc.MediaTrack) error {
    room, err := s.webrtc.GetCallRoom(roomID)
    if err != nil {
        return err
    }
    
    // Start screen sharing
    if err := room.StartScreenShare(ctx, userID, track); err != nil {
        return err
    }
    
    // Notify other participants
    manager := s.streaming.GetManager()
    manager.BroadcastToRoom(ctx, roomID, &streaming.Message{
        Type: "screen_share_started",
        Data: map[string]any{
            "user_id":   userID,
            "room_id":   roomID,
            "track_id":  track.ID(),
            "timestamp": time.Now(),
        },
    })
    
    return nil
}

func (s *CallService) MonitorCallQuality(ctx context.Context, roomID string) {
    room, err := s.webrtc.GetCallRoom(roomID)
    if err != nil {
        return
    }
    
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            quality, err := room.GetQuality(ctx)
            if err != nil {
                continue
            }
            
            // Check for quality issues
            if quality.AverageQuality < 50 {
                s.handlePoorQuality(ctx, room, quality)
            }
            
            // Log quality metrics
            s.logger.Info("call quality update",
                forge.F("room_id", roomID),
                forge.F("participants", quality.ParticipantCount),
                forge.F("avg_quality", quality.AverageQuality),
                forge.F("packet_loss", quality.PacketLoss),
                forge.F("latency", quality.Latency),
            )
        }
    }
}

func (s *CallService) handlePoorQuality(ctx context.Context, room webrtc.CallRoom, quality *webrtc.CallQuality) {
    // Notify participants about quality issues
    manager := s.streaming.GetManager()
    manager.BroadcastToRoom(ctx, room.ID(), &streaming.Message{
        Type: "quality_warning",
        Data: map[string]any{
            "room_id":       room.ID(),
            "quality_score": quality.AverageQuality,
            "packet_loss":   quality.PacketLoss,
            "suggestions":   []string{
                "Check your internet connection",
                "Close other applications using bandwidth",
                "Move closer to your WiFi router",
            },
        },
    })
    
    // Implement adaptive quality measures
    for participantID, participantQuality := range quality.Participants {
        if participantQuality.Score < 30 {
            // Suggest reducing video quality
            peer, err := room.GetPeer(participantID)
            if err != nil {
                continue
            }
            
            // This would trigger client-side quality reduction
            s.suggestQualityReduction(ctx, peer, participantQuality)
        }
    }
}

type ConferenceOptions struct {
    Name             string
    Private          bool
    RecordingEnabled bool
    MaxDuration      time.Duration
}

SFU Implementation

package main

import (
    "context"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
    "github.com/xraph/forge/extensions/webrtc"
)

func main() {
    app := forge.NewApp(forge.AppConfig{
        Name:    "sfu-server",
        Version: "1.0.0",
    })

    // Register streaming with Redis backend for distributed signaling
    streamingExt := streaming.NewExtension(
        streaming.WithRedisBackend("redis://localhost:6379"),
        streaming.WithDistributed(true),
        streaming.WithNodeID("sfu-node-1"),
    )
    app.RegisterExtension(streamingExt)

    // Configure WebRTC with SFU topology
    webrtcExt, _ := webrtc.New(streamingExt, webrtc.Config{
        Topology: webrtc.TopologySFU,
        
        STUNServers: []string{
            "stun:stun.l.google.com:19302",
        },
        TURNServers: []webrtc.TURNConfig{{
            URLs:       []string{"turn:turn.example.com:3478"},
            Username:   "turnuser",
            Credential: "turnpass",
        }},
        
        SFUConfig: &webrtc.SFUConfig{
            WorkerCount:      8,
            MaxBandwidthMbps: 1000,
            AdaptiveBitrate:  true,
            SimulcastEnabled: true,
            QualityLayers: []webrtc.QualityLayer{
                {RID: "f", MaxWidth: 1920, MaxHeight: 1080, MaxFPS: 30, Bitrate: 2500},
                {RID: "h", MaxWidth: 1280, MaxHeight: 720,  MaxFPS: 30, Bitrate: 1200},
                {RID: "q", MaxWidth: 640,  MaxHeight: 360,  MaxFPS: 30, Bitrate: 500},
            },
            RecordingEnabled: true,
            RecordingFormat:  "webm",
        },
        
        MediaConfig: webrtc.MediaConfig{
            AudioEnabled:        true,
            AudioCodecs:         []string{"opus"},
            VideoEnabled:        true,
            VideoCodecs:         []string{"VP8", "H264"},
            DataChannelsEnabled: true,
            MaxVideoBitrate:     2500,
            MinVideoBitrate:     150,
        },
        
        QualityConfig: webrtc.QualityConfig{
            MonitorEnabled:       true,
            AdaptiveQuality:      true,
            QualityCheckInterval: 5 * time.Second,
        },
    })
    
    app.RegisterExtension(webrtcExt)

    // Register routes
    webrtcExt.RegisterRoutes(app.Router())

    // Large conference endpoint
    app.Router().POST("/conference/create", func(ctx forge.Context) error {
        var req struct {
            Name         string   `json:"name" validate:"required"`
            MaxMembers   int      `json:"max_members"`
            Participants []string `json:"participants"`
        }
        
        if err := ctx.Bind(&req); err != nil {
            return ctx.JSON(400, map[string]string{"error": "Invalid request"})
        }
        
        ownerID := ctx.Get("user_id").(string)
        
        // Create large conference room
        room, err := webrtcExt.CreateCallRoom(ctx.Request().Context(), "", streaming.RoomOptions{
            Name:       req.Name,
            Owner:      ownerID,
            MaxMembers: req.MaxMembers,
        })
        if err != nil {
            return ctx.JSON(500, map[string]string{"error": "Failed to create conference"})
        }
        
        // Start recording for large conferences
        recorder := webrtcExt.GetRecorder()
        recorder.Start(ctx.Request().Context(), room.ID(), &webrtc.RecordingOptions{
            Format:      "webm",
            VideoCodec:  "VP8",
            AudioCodec:  "opus",
            OutputPath:  fmt.Sprintf("./recordings/conference_%s", room.ID()),
            IncludeChat: true,
        })
        
        return ctx.JSON(201, map[string]any{
            "room_id":     room.ID(),
            "room_name":   room.Name(),
            "topology":    "sfu",
            "max_members": req.MaxMembers,
        })
    })

    app.Start()
}

Client-Side Integration

JavaScript WebRTC Client

class ForgeWebRTCClient {
    constructor(serverUrl, roomId, userId) {
        this.serverUrl = serverUrl;
        this.roomId = roomId;
        this.userId = userId;
        this.ws = null;
        this.pc = null;
        this.localStream = null;
        this.remoteStreams = new Map();
        
        this.setupPeerConnection();
    }
    
    setupPeerConnection() {
        this.pc = new RTCPeerConnection({
            iceServers: [
                { urls: 'stun:stun.l.google.com:19302' },
                { 
                    urls: 'turn:turn.example.com:3478',
                    username: 'turnuser',
                    credential: 'turnpass'
                }
            ]
        });
        
        // Handle ICE candidates
        this.pc.onicecandidate = (event) => {
            if (event.candidate) {
                this.sendSignalingMessage({
                    type: 'webrtc.ice_candidate',
                    candidate: event.candidate
                });
            }
        };
        
        // Handle remote tracks
        this.pc.ontrack = (event) => {
            const [remoteStream] = event.streams;
            const trackId = event.track.id;
            
            this.remoteStreams.set(trackId, remoteStream);
            this.onRemoteTrack(trackId, remoteStream, event.track);
        };
        
        // Handle connection state changes
        this.pc.onconnectionstatechange = () => {
            console.log('Connection state:', this.pc.connectionState);
            this.onConnectionStateChange(this.pc.connectionState);
        };
    }
    
    async connect() {
        // Connect to signaling server
        this.ws = new WebSocket(`${this.serverUrl}/webrtc/signal/${this.roomId}`);
        
        this.ws.onopen = () => {
            console.log('Connected to signaling server');
            this.onSignalingConnected();
        };
        
        this.ws.onmessage = async (event) => {
            const message = JSON.parse(event.data);
            await this.handleSignalingMessage(message);
        };
        
        this.ws.onclose = () => {
            console.log('Disconnected from signaling server');
            this.onSignalingDisconnected();
        };
        
        this.ws.onerror = (error) => {
            console.error('Signaling error:', error);
            this.onSignalingError(error);
        };
    }
    
    async handleSignalingMessage(message) {
        switch (message.type) {
            case 'webrtc.offer':
                await this.handleOffer(message.sdp);
                break;
                
            case 'webrtc.answer':
                await this.handleAnswer(message.sdp);
                break;
                
            case 'webrtc.ice_candidate':
                await this.handleICECandidate(message.candidate);
                break;
                
            case 'peer_joined':
                await this.handlePeerJoined(message.peer_id);
                break;
                
            case 'peer_left':
                this.handlePeerLeft(message.peer_id);
                break;
        }
    }
    
    async handleOffer(offer) {
        await this.pc.setRemoteDescription(offer);
        const answer = await this.pc.createAnswer();
        await this.pc.setLocalDescription(answer);
        
        this.sendSignalingMessage({
            type: 'webrtc.answer',
            sdp: answer
        });
    }
    
    async handleAnswer(answer) {
        await this.pc.setRemoteDescription(answer);
    }
    
    async handleICECandidate(candidate) {
        await this.pc.addIceCandidate(candidate);
    }
    
    async handlePeerJoined(peerId) {
        // Create offer for new peer
        const offer = await this.pc.createOffer();
        await this.pc.setLocalDescription(offer);
        
        this.sendSignalingMessage({
            type: 'webrtc.offer',
            target_peer: peerId,
            sdp: offer
        });
    }
    
    handlePeerLeft(peerId) {
        // Clean up peer resources
        this.remoteStreams.delete(peerId);
        this.onPeerLeft(peerId);
    }
    
    async startLocalMedia(constraints = { audio: true, video: true }) {
        try {
            this.localStream = await navigator.mediaDevices.getUserMedia(constraints);
            
            // Add tracks to peer connection
            this.localStream.getTracks().forEach(track => {
                this.pc.addTrack(track, this.localStream);
            });
            
            this.onLocalMediaStarted(this.localStream);
            return this.localStream;
        } catch (error) {
            console.error('Failed to start local media:', error);
            this.onLocalMediaError(error);
            throw error;
        }
    }
    
    async startScreenShare() {
        try {
            const screenStream = await navigator.mediaDevices.getDisplayMedia({
                video: true,
                audio: true
            });
            
            // Replace video track
            const videoTrack = screenStream.getVideoTracks()[0];
            const sender = this.pc.getSenders().find(s => 
                s.track && s.track.kind === 'video'
            );
            
            if (sender) {
                await sender.replaceTrack(videoTrack);
            }
            
            // Handle screen share end
            videoTrack.onended = () => {
                this.stopScreenShare();
            };
            
            this.onScreenShareStarted(screenStream);
            return screenStream;
        } catch (error) {
            console.error('Failed to start screen share:', error);
            this.onScreenShareError(error);
            throw error;
        }
    }
    
    async stopScreenShare() {
        // Replace with camera video
        if (this.localStream) {
            const videoTrack = this.localStream.getVideoTracks()[0];
            const sender = this.pc.getSenders().find(s => 
                s.track && s.track.kind === 'video'
            );
            
            if (sender && videoTrack) {
                await sender.replaceTrack(videoTrack);
            }
        }
        
        this.onScreenShareStopped();
    }
    
    sendSignalingMessage(message) {
        if (this.ws && this.ws.readyState === WebSocket.OPEN) {
            this.ws.send(JSON.stringify(message));
        }
    }
    
    muteAudio() {
        if (this.localStream) {
            this.localStream.getAudioTracks().forEach(track => {
                track.enabled = false;
            });
        }
    }
    
    unmuteAudio() {
        if (this.localStream) {
            this.localStream.getAudioTracks().forEach(track => {
                track.enabled = true;
            });
        }
    }
    
    muteVideo() {
        if (this.localStream) {
            this.localStream.getVideoTracks().forEach(track => {
                track.enabled = false;
            });
        }
    }
    
    unmuteVideo() {
        if (this.localStream) {
            this.localStream.getVideoTracks().forEach(track => {
                track.enabled = true;
            });
        }
    }
    
    disconnect() {
        if (this.localStream) {
            this.localStream.getTracks().forEach(track => track.stop());
        }
        
        if (this.pc) {
            this.pc.close();
        }
        
        if (this.ws) {
            this.ws.close();
        }
    }
    
    // Event handlers (override these)
    onSignalingConnected() {}
    onSignalingDisconnected() {}
    onSignalingError(error) {}
    onLocalMediaStarted(stream) {}
    onLocalMediaError(error) {}
    onRemoteTrack(trackId, stream, track) {}
    onPeerLeft(peerId) {}
    onConnectionStateChange(state) {}
    onScreenShareStarted(stream) {}
    onScreenShareStopped() {}
    onScreenShareError(error) {}
}

// Usage example
const client = new ForgeWebRTCClient('wss://example.com', 'room-123', 'user-456');

client.onLocalMediaStarted = (stream) => {
    document.getElementById('localVideo').srcObject = stream;
};

client.onRemoteTrack = (trackId, stream, track) => {
    const remoteVideo = document.createElement('video');
    remoteVideo.srcObject = stream;
    remoteVideo.autoplay = true;
    document.getElementById('remoteVideos').appendChild(remoteVideo);
};

// Connect and start media
await client.connect();
await client.startLocalMedia();

Advanced Features

Quality Monitoring & Adaptive Streaming

package main

import (
    "context"
    "time"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/webrtc"
)

type QualityManager struct {
    webrtc  *webrtc.Extension
    monitor webrtc.QualityMonitor
    logger  forge.Logger
}

func NewQualityManager(webrtcExt *webrtc.Extension, logger forge.Logger) *QualityManager {
    return &QualityManager{
        webrtc:  webrtcExt,
        monitor: webrtcExt.GetQualityMonitor(),
        logger:  logger,
    }
}

func (qm *QualityManager) StartMonitoring(ctx context.Context, roomID string) {
    room, err := qm.webrtc.GetCallRoom(roomID)
    if err != nil {
        return
    }
    
    // Monitor all peers in the room
    peers := room.GetPeers()
    for _, peer := range peers {
        go qm.monitorPeer(ctx, peer)
    }
    
    // Set up quality change handler
    qm.monitor.OnQualityChange(func(peerID string, quality *webrtc.ConnectionQuality) {
        qm.handleQualityChange(ctx, roomID, peerID, quality)
    })
}

func (qm *QualityManager) monitorPeer(ctx context.Context, peer webrtc.PeerConnection) {
    if err := qm.monitor.Monitor(ctx, peer); err != nil {
        qm.logger.Error("failed to start monitoring peer",
            forge.F("peer_id", peer.ID()),
            forge.F("error", err),
        )
    }
}

func (qm *QualityManager) handleQualityChange(ctx context.Context, roomID, peerID string, quality *webrtc.ConnectionQuality) {
    qm.logger.Info("quality change detected",
        forge.F("room_id", roomID),
        forge.F("peer_id", peerID),
        forge.F("score", quality.Score),
        forge.F("packet_loss", quality.PacketLoss),
        forge.F("jitter", quality.Jitter),
        forge.F("latency", quality.Latency),
    )
    
    // Implement adaptive quality measures
    if quality.Score < 30 {
        qm.handlePoorQuality(ctx, roomID, peerID, quality)
    } else if quality.Score > 80 {
        qm.handleGoodQuality(ctx, roomID, peerID, quality)
    }
}

func (qm *QualityManager) handlePoorQuality(ctx context.Context, roomID, peerID string, quality *webrtc.ConnectionQuality) {
    // Get SFU router for quality adjustment
    if qm.webrtc.GetConfig().Topology == webrtc.TopologySFU {
        router := qm.webrtc.GetSFURouter()
        
        // Reduce quality for this peer
        router.SetQuality(ctx, peerID, peerID, "q") // Switch to quarter quality
        
        qm.logger.Info("reduced quality for poor connection",
            forge.F("peer_id", peerID),
            forge.F("new_quality", "quarter"),
        )
    }
}

func (qm *QualityManager) handleGoodQuality(ctx context.Context, roomID, peerID string, quality *webrtc.ConnectionQuality) {
    // Increase quality if connection is good
    if qm.webrtc.GetConfig().Topology == webrtc.TopologySFU {
        router := qm.webrtc.GetSFURouter()
        
        // Increase to full quality
        router.SetQuality(ctx, peerID, peerID, "f") // Switch to full quality
        
        qm.logger.Info("increased quality for good connection",
            forge.F("peer_id", peerID),
            forge.F("new_quality", "full"),
        )
    }
}

Recording Management

package main

import (
    "context"
    "fmt"
    "path/filepath"
    "time"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/webrtc"
)

type RecordingManager struct {
    webrtc   *webrtc.Extension
    recorder webrtc.Recorder
    logger   forge.Logger
    
    recordings map[string]*RecordingSession
    mu         sync.RWMutex
}

type RecordingSession struct {
    RoomID    string
    StartedAt time.Time
    Options   *webrtc.RecordingOptions
    Status    *webrtc.RecordingStatus
}

func NewRecordingManager(webrtcExt *webrtc.Extension, logger forge.Logger) *RecordingManager {
    return &RecordingManager{
        webrtc:     webrtcExt,
        recorder:   webrtcExt.GetRecorder(),
        logger:     logger,
        recordings: make(map[string]*RecordingSession),
    }
}

func (rm *RecordingManager) StartRecording(ctx context.Context, roomID string, options *webrtc.RecordingOptions) error {
    rm.mu.Lock()
    defer rm.mu.Unlock()
    
    // Check if already recording
    if _, exists := rm.recordings[roomID]; exists {
        return fmt.Errorf("recording already in progress for room %s", roomID)
    }
    
    // Set default options if not provided
    if options == nil {
        options = &webrtc.RecordingOptions{
            Format:      "webm",
            VideoCodec:  "VP8",
            AudioCodec:  "opus",
            OutputPath:  filepath.Join("./recordings", fmt.Sprintf("%s_%d", roomID, time.Now().Unix())),
            IncludeChat: true,
        }
    }
    
    // Start recording
    if err := rm.recorder.Start(ctx, roomID, options); err != nil {
        return fmt.Errorf("failed to start recording: %w", err)
    }
    
    // Track recording session
    session := &RecordingSession{
        RoomID:    roomID,
        StartedAt: time.Now(),
        Options:   options,
    }
    
    rm.recordings[roomID] = session
    
    rm.logger.Info("recording started",
        forge.F("room_id", roomID),
        forge.F("output_path", options.OutputPath),
        forge.F("format", options.Format),
    )
    
    return nil
}

func (rm *RecordingManager) StopRecording(ctx context.Context, roomID string) error {
    rm.mu.Lock()
    defer rm.mu.Unlock()
    
    session, exists := rm.recordings[roomID]
    if !exists {
        return fmt.Errorf("no recording in progress for room %s", roomID)
    }
    
    // Stop recording
    if err := rm.recorder.Stop(ctx, roomID); err != nil {
        return fmt.Errorf("failed to stop recording: %w", err)
    }
    
    // Get final status
    status, err := rm.recorder.GetStatus(roomID)
    if err != nil {
        rm.logger.Error("failed to get recording status",
            forge.F("room_id", roomID),
            forge.F("error", err),
        )
    } else {
        session.Status = status
    }
    
    duration := time.Since(session.StartedAt)
    
    rm.logger.Info("recording stopped",
        forge.F("room_id", roomID),
        forge.F("duration", duration),
        forge.F("file_size", status.FileSize),
        forge.F("output_path", status.OutputPath),
    )
    
    // Clean up
    delete(rm.recordings, roomID)
    
    return nil
}

func (rm *RecordingManager) GetRecordingStatus(roomID string) (*webrtc.RecordingStatus, error) {
    rm.mu.RLock()
    defer rm.mu.RUnlock()
    
    session, exists := rm.recordings[roomID]
    if !exists {
        return nil, fmt.Errorf("no recording in progress for room %s", roomID)
    }
    
    // Get current status from recorder
    status, err := rm.recorder.GetStatus(roomID)
    if err != nil {
        return nil, err
    }
    
    session.Status = status
    return status, nil
}

func (rm *RecordingManager) PauseRecording(ctx context.Context, roomID string) error {
    if err := rm.recorder.Pause(ctx, roomID); err != nil {
        return fmt.Errorf("failed to pause recording: %w", err)
    }
    
    rm.logger.Info("recording paused", forge.F("room_id", roomID))
    return nil
}

func (rm *RecordingManager) ResumeRecording(ctx context.Context, roomID string) error {
    if err := rm.recorder.Resume(ctx, roomID); err != nil {
        return fmt.Errorf("failed to resume recording: %w", err)
    }
    
    rm.logger.Info("recording resumed", forge.F("room_id", roomID))
    return nil
}

func (rm *RecordingManager) ListActiveRecordings() []*RecordingSession {
    rm.mu.RLock()
    defer rm.mu.RUnlock()
    
    sessions := make([]*RecordingSession, 0, len(rm.recordings))
    for _, session := range rm.recordings {
        sessions = append(sessions, session)
    }
    
    return sessions
}

Best Practices

Connection Management

  • Use appropriate topology: Mesh for small groups (2-4), SFU for medium groups (5-50), MCU for large groups (50+)
  • Implement reconnection logic: Handle network interruptions gracefully
  • Monitor connection quality: Use quality monitoring to detect and respond to network issues
  • Optimize ICE configuration: Use both STUN and TURN servers for reliable connectivity

Media Optimization

  • Enable simulcast: Use multiple quality layers for adaptive streaming
  • Implement adaptive bitrate: Adjust quality based on network conditions
  • Use appropriate codecs: Choose codecs based on device capabilities and bandwidth
  • Handle track management: Properly add, remove, and replace media tracks

Performance Considerations

  • Limit concurrent connections: Set reasonable limits based on server capacity
  • Use efficient signaling: Minimize signaling overhead with batching and compression
  • Monitor resource usage: Track CPU, memory, and bandwidth usage
  • Implement load balancing: Distribute load across multiple nodes for scalability

Security Best Practices

  • Always authenticate: Require authentication for all WebRTC operations
  • Use secure signaling: Implement TLS for signaling connections
  • Validate permissions: Check room permissions before allowing operations
  • Monitor for abuse: Implement rate limiting and abuse detection

Troubleshooting

Common Issues

Connection Failed

Error: WebRTC connection failed to establish

Solutions:

  • Check STUN/TURN server configuration
  • Verify firewall and NAT settings
  • Ensure proper ICE candidate exchange
  • Check network connectivity between peers

Signaling Issues

Error: Signaling connection lost

Solutions:

  • Verify streaming extension is running
  • Check WebSocket endpoint configuration
  • Ensure proper authentication
  • Monitor signaling server connectivity

Media Track Issues

Error: Failed to add media track

Solutions:

  • Check media permissions in browser
  • Verify codec compatibility
  • Ensure proper track constraints
  • Check device availability

Quality Problems

Warning: Poor connection quality detected

Solutions:

  • Enable adaptive quality
  • Check network bandwidth
  • Reduce video resolution/bitrate
  • Use quality monitoring for optimization

Debugging

Enable Debug Logging

logging:
  level: debug
  loggers:
    webrtc: debug
    webrtc.signaling: debug
    webrtc.peer: debug
    webrtc.quality: debug

Monitor WebRTC Statistics

// Get peer connection statistics
peer, err := room.GetPeer(userID)
if err != nil {
    return err
}

stats, err := peer.GetStats(ctx)
if err != nil {
    return err
}

fmt.Printf("Bytes Sent: %d\n", stats.BytesSent)
fmt.Printf("Bytes Received: %d\n", stats.BytesReceived)
fmt.Printf("Packet Loss: %d\n", stats.PacketsLost)
fmt.Printf("RTT: %v\n", stats.RoundTripTime)

Health Checks

# Check WebRTC extension health
curl http://localhost:8080/_/health/webrtc

# Check signaling connectivity
curl http://localhost:8080/_/health/webrtc/signaling

# Check TURN server connectivity
curl http://localhost:8080/_/health/webrtc/turn

Next Steps

How is this guide?

Last updated on