Streaming Extension

Real-time streaming with WebSocket/SSE support, rooms, channels, presence tracking, and distributed coordination

Streaming Extension

The Streaming extension provides real-time communication capabilities for your Forge application with WebSocket and Server-Sent Events (SSE) support. It includes advanced features like rooms, channels, presence tracking, typing indicators, message history, and distributed coordination for multi-node deployments.

Features

Real-Time Communication

  • WebSocket Support: Full-duplex real-time communication with connection management
  • Server-Sent Events (SSE): One-way streaming for real-time updates
  • Connection Management: Advanced connection tracking with metadata and lifecycle management
  • Message Broadcasting: Efficient message distribution with filtering and targeting

Rooms & Channels

  • Chat Rooms: Organized conversations with membership, roles, and permissions
  • Pub/Sub Channels: Topic-based message broadcasting with pattern matching
  • Room Management: Create, join, leave, and manage room settings
  • Channel Subscriptions: Subscribe to channels with custom filters

Presence & Activity

  • Presence Tracking: Track user online/offline/away status across rooms
  • Typing Indicators: Real-time typing indicators per room
  • Activity Tracking: Monitor user activity and idle connections
  • Status Broadcasting: Automatic presence updates to relevant users

Message Management

  • Message History: Persist and retrieve conversation history
  • Message Reactions: Add reactions and interactions to messages
  • Message Editing: Edit and delete messages with audit trails
  • Content Filtering: Filter messages based on content, metadata, or permissions

Distributed Architecture

  • Multi-Node Support: Redis/NATS backends for horizontal scaling
  • Load Balancing: Consistent hashing, least connections, and geographic routing
  • State Synchronization: Sync presence, rooms, and messages across nodes
  • Fault Tolerance: Automatic failover and recovery mechanisms

Security & Performance

  • Authentication: Connection-level and message-level authentication
  • Rate Limiting: Adaptive rate limiting with multiple algorithms
  • Content Validation: Schema validation and security filtering
  • Connection Limits: Per-user connection and resource limits

Installation

Go Module

go get github.com/xraph/forge/extensions/streaming

Docker

FROM xraph/forge:latest
# Streaming extension is included

Package Manager

# Using Forge CLI
forge extension add streaming

# Using package manager
npm install @xraph/forge-streaming

Configuration

YAML Configuration

extensions:
  streaming:
    # Backend configuration
    backend: "redis"                    # "local", "redis", "nats"
    backend_urls:
      - "redis://localhost:6379"
    backend_username: ""                # Optional authentication
    backend_password: ""
    
    # Feature toggles
    enable_rooms: true
    enable_channels: true
    enable_presence: true
    enable_typing_indicators: true
    enable_message_history: true
    enable_distributed: true
    
    # Connection limits
    max_connections_per_user: 5
    max_rooms_per_user: 50
    max_channels_per_user: 100
    max_message_size: 65536             # 64KB
    max_messages_per_second: 100
    
    # Timeouts
    ping_interval: 30                   # seconds
    pong_timeout: 10                    # seconds
    write_timeout: 10                   # seconds
    read_timeout: 60                    # seconds
    
    # Presence settings
    presence_timeout: 300               # 5 minutes
    presence_cleanup: 60                # 1 minute
    
    # Typing indicators
    typing_timeout: 5                   # seconds
    typing_cleanup: 10                  # seconds
    max_typing_users_per_room: 10
    
    # Message retention
    message_retention: 720              # hours (30 days)
    max_history_messages: 1000
    
    # Distributed settings
    node_id: "node-1"                   # Auto-generated if not set
    
    # TLS configuration
    tls_enabled: false
    tls_cert_file: ""
    tls_key_file: ""
    tls_ca_file: ""
    
    # Rate limiting
    rate_limit:
      enabled: true
      algorithm: "token_bucket"         # "token_bucket", "sliding_window", "adaptive"
      requests_per_second: 10
      burst_size: 20
      
    # Authentication
    auth:
      require_auth: true
      token_header: "Authorization"
      validate_on_connect: true
      validate_on_message: false

Environment Variables

# Backend Configuration
FORGE_STREAMING_BACKEND=redis
FORGE_STREAMING_BACKEND_URLS=redis://localhost:6379
FORGE_STREAMING_BACKEND_USERNAME=
FORGE_STREAMING_BACKEND_PASSWORD=

# Feature Toggles
FORGE_STREAMING_ENABLE_ROOMS=true
FORGE_STREAMING_ENABLE_CHANNELS=true
FORGE_STREAMING_ENABLE_PRESENCE=true
FORGE_STREAMING_ENABLE_TYPING_INDICATORS=true
FORGE_STREAMING_ENABLE_MESSAGE_HISTORY=true

# Connection Limits
FORGE_STREAMING_MAX_CONNECTIONS_PER_USER=5
FORGE_STREAMING_MAX_ROOMS_PER_USER=50
FORGE_STREAMING_MAX_MESSAGE_SIZE=65536

# Timeouts
FORGE_STREAMING_PING_INTERVAL=30
FORGE_STREAMING_PONG_TIMEOUT=10
FORGE_STREAMING_PRESENCE_TIMEOUT=300

# Distributed
FORGE_STREAMING_NODE_ID=node-1
FORGE_STREAMING_ENABLE_DISTRIBUTED=true

Programmatic Configuration

package main

import (
    "time"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
)

func main() {
    app := forge.NewApp(forge.AppConfig{
        Name:    "chat-app",
        Version: "1.0.0",
    })

    // Configure streaming extension
    app.RegisterExtension(streaming.NewExtension(
        // Backend
        streaming.WithRedisBackend("redis://localhost:6379"),
        streaming.WithAuthentication("username", "password"),
        
        // Features
        streaming.WithFeatures(true, true, true, true, true), // rooms, channels, presence, typing, history
        
        // Limits
        streaming.WithConnectionLimits(5, 50, 100), // conns/user, rooms/user, channels/user
        streaming.WithMessageLimits(64*1024, 100),  // max size, max/second
        
        // Timeouts
        streaming.WithTimeouts(30*time.Second, 10*time.Second, 10*time.Second), // ping, pong, write
        
        // Retention
        streaming.WithMessageRetention(30 * 24 * time.Hour), // 30 days
        
        // Distributed
        streaming.WithNodeID("node-1"),
        streaming.WithDistributed(true),
        
        // TLS
        streaming.WithTLS("cert.pem", "key.pem", "ca.pem"),
        
        // Rate limiting
        streaming.WithRateLimit("token_bucket", 10, 20), // algorithm, rps, burst
    ))

    app.Start()
}

Usage Examples

Basic WebSocket Chat

package main

import (
    "context"
    "net/http"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
)

func main() {
    app := forge.NewApp(forge.AppConfig{
        Name:    "chat-app",
        Version: "1.0.0",
    })

    // Register streaming extension
    app.RegisterExtension(streaming.NewExtension(
        streaming.WithLocalBackend(),
        streaming.WithFeatures(true, true, true, true, true),
    ))

    // WebSocket endpoint
    app.Router().GET("/ws", handleWebSocket)
    
    // REST API for room management
    app.Router().POST("/api/rooms", createRoom)
    app.Router().GET("/api/rooms", listRooms)
    app.Router().POST("/api/rooms/:id/join", joinRoom)
    app.Router().POST("/api/rooms/:id/leave", leaveRoom)

    app.Start()
}

func handleWebSocket(ctx forge.Context) error {
    // Get streaming manager from DI container
    manager := forge.Must[streaming.Manager](ctx.Container(), "streaming")
    
    // Upgrade to WebSocket
    conn, err := ctx.UpgradeWebSocket()
    if err != nil {
        return err
    }

    // Create enhanced connection with metadata
    enhancedConn := streaming.NewEnhancedConnection(conn)
    
    // Set user information (from authentication)
    userID := ctx.Get("user_id").(string)
    sessionID := ctx.Get("session_id").(string)
    enhancedConn.SetUserID(userID)
    enhancedConn.SetSessionID(sessionID)
    
    // Register connection with manager
    if err := manager.Register(enhancedConn); err != nil {
        return err
    }
    
    // Handle connection lifecycle
    defer func() {
        manager.Unregister(enhancedConn.ID())
        conn.Close()
    }()

    // Message handling loop
    for {
        var msg streaming.Message
        if err := conn.ReadJSON(&msg); err != nil {
            break
        }
        
        // Process message based on type
        switch msg.Type {
        case "join_room":
            handleJoinRoom(ctx.Request().Context(), manager, enhancedConn, &msg)
        case "leave_room":
            handleLeaveRoom(ctx.Request().Context(), manager, enhancedConn, &msg)
        case "send_message":
            handleSendMessage(ctx.Request().Context(), manager, enhancedConn, &msg)
        case "start_typing":
            handleStartTyping(ctx.Request().Context(), manager, enhancedConn, &msg)
        case "stop_typing":
            handleStopTyping(ctx.Request().Context(), manager, enhancedConn, &msg)
        }
    }

    return nil
}

func handleJoinRoom(ctx context.Context, manager streaming.Manager, conn streaming.Connection, msg *streaming.Message) {
    roomID := msg.Data["room_id"].(string)
    
    // Join room
    if err := manager.JoinRoom(ctx, conn.ID(), roomID); err != nil {
        conn.WriteJSON(streaming.Message{
            Type: "error",
            Data: map[string]any{"error": err.Error()},
        })
        return
    }
    
    // Set presence
    manager.SetPresence(ctx, conn.GetUserID(), "online")
    
    // Notify room members
    manager.BroadcastToRoom(ctx, roomID, &streaming.Message{
        Type: "user_joined",
        Data: map[string]any{
            "user_id": conn.GetUserID(),
            "room_id": roomID,
        },
    })
    
    // Send confirmation
    conn.WriteJSON(streaming.Message{
        Type: "room_joined",
        Data: map[string]any{"room_id": roomID},
    })
}

func handleSendMessage(ctx context.Context, manager streaming.Manager, conn streaming.Connection, msg *streaming.Message) {
    roomID := msg.Data["room_id"].(string)
    content := msg.Data["content"].(string)
    
    // Create message
    chatMsg := &streaming.Message{
        Type:      "message",
        UserID:    conn.GetUserID(),
        RoomID:    roomID,
        Content:   content,
        Timestamp: time.Now(),
        Data: map[string]any{
            "user_id": conn.GetUserID(),
            "room_id": roomID,
            "content": content,
        },
    }
    
    // Save to history
    if err := manager.SaveMessage(ctx, chatMsg); err != nil {
        conn.WriteJSON(streaming.Message{
            Type: "error",
            Data: map[string]any{"error": "Failed to save message"},
        })
        return
    }
    
    // Broadcast to room
    manager.BroadcastToRoom(ctx, roomID, chatMsg)
}

func createRoom(ctx forge.Context) error {
    manager := forge.Must[streaming.Manager](ctx.Container(), "streaming")
    
    var req struct {
        Name        string `json:"name" validate:"required"`
        Description string `json:"description"`
        Private     bool   `json:"private"`
        MaxMembers  int    `json:"max_members"`
    }
    
    if err := ctx.Bind(&req); err != nil {
        return ctx.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    // Create room
    room := streaming.NewRoom(streaming.RoomOptions{
        Name:        req.Name,
        Description: req.Description,
        Owner:       ctx.Get("user_id").(string),
        Private:     req.Private,
        MaxMembers:  req.MaxMembers,
    })
    
    if err := manager.CreateRoom(ctx.Request().Context(), room); err != nil {
        return ctx.JSON(500, map[string]string{"error": "Failed to create room"})
    }
    
    return ctx.JSON(201, map[string]any{
        "id":          room.GetID(),
        "name":        room.GetName(),
        "description": room.GetDescription(),
        "private":     room.IsPrivate(),
        "owner":       room.GetOwner(),
        "created":     room.GetCreated(),
    })
}

Server-Sent Events (SSE)

func handleSSE(ctx forge.Context) error {
    manager := forge.Must[streaming.Manager](ctx.Container(), "streaming")
    
    // Set SSE headers
    ctx.Header("Content-Type", "text/event-stream")
    ctx.Header("Cache-Control", "no-cache")
    ctx.Header("Connection", "keep-alive")
    ctx.Header("Access-Control-Allow-Origin", "*")
    
    // Create SSE connection
    sseConn := streaming.NewSSEConnection(ctx.Response())
    enhancedConn := streaming.NewEnhancedConnection(sseConn)
    
    userID := ctx.Get("user_id").(string)
    enhancedConn.SetUserID(userID)
    
    // Register connection
    if err := manager.Register(enhancedConn); err != nil {
        return err
    }
    
    defer manager.Unregister(enhancedConn.ID())
    
    // Subscribe to user-specific channel
    userChannel := fmt.Sprintf("user.%s.notifications", userID)
    manager.Subscribe(ctx.Request().Context(), enhancedConn.ID(), userChannel, nil)
    
    // Keep connection alive
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Request().Context().Done():
            return nil
        case <-ticker.C:
            // Send heartbeat
            sseConn.WriteEvent("heartbeat", map[string]any{
                "timestamp": time.Now().Unix(),
            })
        }
    }
}

Advanced Room Management

package main

import (
    "context"
    "time"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
)

type ChatService struct {
    manager streaming.Manager
    logger  forge.Logger
}

func NewChatService(manager streaming.Manager, logger forge.Logger) *ChatService {
    return &ChatService{
        manager: manager,
        logger:  logger,
    }
}

func (s *ChatService) CreatePrivateRoom(ctx context.Context, ownerID string, memberIDs []string, name string) (*streaming.Room, error) {
    // Create private room
    room := streaming.NewRoom(streaming.RoomOptions{
        Name:       name,
        Owner:      ownerID,
        Private:    true,
        MaxMembers: len(memberIDs) + 1, // Owner + members
    })
    
    if err := s.manager.CreateRoom(ctx, room); err != nil {
        return nil, err
    }
    
    // Add owner as admin
    room.Join(ctx, ownerID, streaming.RoleAdmin)
    
    // Add members
    for _, memberID := range memberIDs {
        if err := room.Join(ctx, memberID, streaming.RoleMember); err != nil {
            s.logger.Error("failed to add member to room",
                forge.F("room_id", room.GetID()),
                forge.F("member_id", memberID),
                forge.F("error", err),
            )
        }
    }
    
    // Send invitations
    for _, memberID := range memberIDs {
        s.sendRoomInvitation(ctx, room, memberID)
    }
    
    return &room, nil
}

func (s *ChatService) sendRoomInvitation(ctx context.Context, room streaming.Room, userID string) {
    invitation := &streaming.Message{
        Type:   "room_invitation",
        UserID: "system",
        Data: map[string]any{
            "room_id":     room.GetID(),
            "room_name":   room.GetName(),
            "invited_by":  room.GetOwner(),
            "timestamp":   time.Now(),
        },
    }
    
    s.manager.SendToUser(ctx, userID, invitation)
}

func (s *ChatService) HandleModerationAction(ctx context.Context, roomID, moderatorID, targetID, action, reason string) error {
    // Verify moderator permissions
    room, err := s.manager.GetRoom(ctx, roomID)
    if err != nil {
        return err
    }
    
    hasPermission, err := room.HasPermission(ctx, moderatorID, streaming.PermissionManageRoom)
    if err != nil || !hasPermission {
        return streaming.ErrInsufficientPermissions
    }
    
    // Perform moderation action
    switch action {
    case "ban":
        return s.banMember(ctx, room, moderatorID, targetID, reason)
    case "kick":
        return s.kickMember(ctx, room, moderatorID, targetID, reason)
    case "mute":
        return s.muteMember(ctx, room, moderatorID, targetID, reason, 24*time.Hour)
    default:
        return streaming.ErrInvalidModerationAction
    }
}

func (s *ChatService) banMember(ctx context.Context, room streaming.Room, moderatorID, targetID, reason string) error {
    // Ban member
    if err := room.BanMember(ctx, targetID, reason, nil); err != nil {
        return err
    }
    
    // Remove from room
    if err := room.Leave(ctx, targetID); err != nil {
        s.logger.Error("failed to remove banned member from room",
            forge.F("room_id", room.GetID()),
            forge.F("target_id", targetID),
            forge.F("error", err),
        )
    }
    
    // Disconnect user from room
    userConns := s.manager.GetUserConnections(targetID)
    for _, conn := range userConns {
        if conn.IsInRoom(room.GetID()) {
            s.manager.LeaveRoom(ctx, conn.ID(), room.GetID())
        }
    }
    
    // Broadcast moderation event
    s.manager.BroadcastToRoom(ctx, room.GetID(), &streaming.Message{
        Type: "member_banned",
        Data: map[string]any{
            "room_id":      room.GetID(),
            "target_id":    targetID,
            "moderator_id": moderatorID,
            "reason":       reason,
            "timestamp":    time.Now(),
        },
    })
    
    return nil
}

func (s *ChatService) GetRoomHistory(ctx context.Context, roomID, userID string, limit int, before *time.Time) ([]*streaming.Message, error) {
    // Verify user is member of room
    room, err := s.manager.GetRoom(ctx, roomID)
    if err != nil {
        return nil, err
    }
    
    isMember, err := room.IsMember(ctx, userID)
    if err != nil || !isMember {
        return nil, streaming.ErrNotRoomMember
    }
    
    // Get message history
    query := streaming.HistoryQuery{
        RoomID: roomID,
        Limit:  limit,
        Before: before,
    }
    
    return s.manager.GetHistory(ctx, roomID, query)
}

Channel Subscriptions with Filters

func handleChannelSubscription(ctx forge.Context) error {
    manager := forge.Must[streaming.Manager](ctx.Container(), "streaming")
    
    var req struct {
        ChannelPattern string         `json:"channel_pattern"`
        Filters        map[string]any `json:"filters"`
    }
    
    if err := ctx.Bind(&req); err != nil {
        return ctx.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    // Get connection from context (set by WebSocket middleware)
    conn := ctx.Get("websocket_connection").(streaming.Connection)
    
    // Subscribe to channels matching pattern
    channels, err := manager.FindChannelsByPattern(ctx.Request().Context(), req.ChannelPattern)
    if err != nil {
        return ctx.JSON(500, map[string]string{"error": "Failed to find channels"})
    }
    
    for _, channel := range channels {
        // Subscribe with filters
        err := manager.Subscribe(ctx.Request().Context(), conn.ID(), channel.GetID(), req.Filters)
        if err != nil {
            return ctx.JSON(500, map[string]string{"error": "Failed to subscribe"})
        }
    }
    
    return ctx.JSON(200, map[string]any{
        "subscribed_channels": len(channels),
        "pattern":            req.ChannelPattern,
        "filters":            req.Filters,
    })
}

// Example: Subscribe to user-specific notifications
func subscribeToUserNotifications(ctx context.Context, manager streaming.Manager, conn streaming.Connection, userID string) error {
    // Subscribe to various user channels
    channels := []string{
        fmt.Sprintf("user.%s.messages", userID),
        fmt.Sprintf("user.%s.notifications", userID),
        fmt.Sprintf("user.%s.presence", userID),
    }
    
    for _, channelID := range channels {
        // Create channel if it doesn't exist
        channel := streaming.NewChannel(streaming.ChannelOptions{
            ID:      channelID,
            Name:    channelID,
            Private: true,
        })
        
        manager.CreateChannel(ctx, channel)
        
        // Subscribe with user-specific filters
        filters := map[string]any{
            "user_id": userID,
            "priority": []string{"high", "medium"}, // Only high/medium priority messages
        }
        
        if err := manager.Subscribe(ctx, conn.ID(), channelID, filters); err != nil {
            return err
        }
    }
    
    return nil
}

Advanced Features

Distributed Deployment

package main

import (
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
)

func main() {
    app := forge.NewApp(forge.AppConfig{
        Name:    "chat-cluster",
        Version: "1.0.0",
    })

    // Configure for distributed deployment
    app.RegisterExtension(streaming.NewExtension(
        // Redis backend for state sharing
        streaming.WithRedisBackend(
            "redis://redis-1:6379",
            "redis://redis-2:6379", 
            "redis://redis-3:6379",
        ),
        
        // Node identification
        streaming.WithNodeID("chat-node-1"),
        streaming.WithDistributed(true),
        
        // Load balancing
        streaming.WithLoadBalancer("consistent_hash"), // "round_robin", "least_connections", "geographic"
        
        // State synchronization
        streaming.WithPresenceSync(true),
        streaming.WithRoomStateSync(true),
        streaming.WithSyncInterval(30 * time.Second),
        
        // Fault tolerance
        streaming.WithHealthCheck(true),
        streaming.WithAutoFailover(true),
        streaming.WithReconnectAttempts(5),
    ))

    app.Start()
}

Custom Authentication

package main

import (
    "context"
    "strings"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
)

// Custom authenticator
type JWTAuthenticator struct {
    secretKey []byte
}

func (a *JWTAuthenticator) AuthenticateConnection(ctx context.Context, token string) (*streaming.AuthResult, error) {
    // Validate JWT token
    claims, err := validateJWT(token, a.secretKey)
    if err != nil {
        return nil, streaming.ErrInvalidToken
    }
    
    return &streaming.AuthResult{
        UserID:      claims.UserID,
        SessionID:   claims.SessionID,
        Permissions: claims.Permissions,
        Metadata: map[string]any{
            "role":       claims.Role,
            "expires_at": claims.ExpiresAt,
        },
    }, nil
}

func (a *JWTAuthenticator) AuthorizeMessage(ctx context.Context, userID string, message *streaming.Message) error {
    // Check if user can send message to room/channel
    switch message.Type {
    case "send_message":
        return a.authorizeRoomMessage(ctx, userID, message)
    case "join_room":
        return a.authorizeRoomJoin(ctx, userID, message)
    default:
        return nil
    }
}

func (a *JWTAuthenticator) authorizeRoomMessage(ctx context.Context, userID string, message *streaming.Message) error {
    roomID := message.Data["room_id"].(string)
    
    // Check if user is member of room
    // Implementation depends on your room store
    
    return nil
}

// Register custom authenticator
func main() {
    app := forge.NewApp(forge.AppConfig{
        Name:    "secure-chat",
        Version: "1.0.0",
    })

    authenticator := &JWTAuthenticator{
        secretKey: []byte("your-secret-key"),
    }

    app.RegisterExtension(streaming.NewExtension(
        streaming.WithLocalBackend(),
        streaming.WithCustomAuth(authenticator),
        streaming.WithAuthRequired(true),
    ))

    // WebSocket endpoint with authentication
    app.Router().GET("/ws", func(ctx forge.Context) error {
        // Extract token from query parameter or header
        token := ctx.Query("token")
        if token == "" {
            token = strings.TrimPrefix(ctx.Header("Authorization"), "Bearer ")
        }
        
        if token == "" {
            return ctx.JSON(401, map[string]string{"error": "Missing token"})
        }
        
        // Authenticate
        authResult, err := authenticator.AuthenticateConnection(ctx.Request().Context(), token)
        if err != nil {
            return ctx.JSON(401, map[string]string{"error": "Invalid token"})
        }
        
        // Store auth info in context
        ctx.Set("user_id", authResult.UserID)
        ctx.Set("session_id", authResult.SessionID)
        ctx.Set("permissions", authResult.Permissions)
        
        return handleWebSocket(ctx)
    })

    app.Start()
}

Rate Limiting & Content Filtering

package main

import (
    "context"
    "regexp"
    "strings"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
)

// Custom content filter
type ProfanityFilter struct {
    bannedWords []string
    patterns    []*regexp.Regexp
}

func NewProfanityFilter() *ProfanityFilter {
    bannedWords := []string{"spam", "abuse", "inappropriate"}
    patterns := make([]*regexp.Regexp, len(bannedWords))
    
    for i, word := range bannedWords {
        patterns[i] = regexp.MustCompile(`(?i)\b` + regexp.QuoteMeta(word) + `\b`)
    }
    
    return &ProfanityFilter{
        bannedWords: bannedWords,
        patterns:    patterns,
    }
}

func (f *ProfanityFilter) FilterMessage(ctx context.Context, message *streaming.Message) (*streaming.Message, error) {
    content, ok := message.Data["content"].(string)
    if !ok {
        return message, nil
    }
    
    // Check for banned words
    for _, pattern := range f.patterns {
        if pattern.MatchString(content) {
            return nil, streaming.ErrMessageFiltered
        }
    }
    
    // Clean up content (example: remove excessive whitespace)
    cleaned := strings.TrimSpace(regexp.MustCompile(`\s+`).ReplaceAllString(content, " "))
    message.Data["content"] = cleaned
    
    return message, nil
}

// Custom rate limiter
type AdaptiveRateLimiter struct {
    baseLimits map[string]int // user_id -> base limit
    penalties  map[string]int // user_id -> penalty multiplier
}

func (r *AdaptiveRateLimiter) CheckLimit(ctx context.Context, userID string, action string) error {
    baseLimit := r.baseLimits[userID]
    if baseLimit == 0 {
        baseLimit = 10 // Default limit
    }
    
    penalty := r.penalties[userID]
    if penalty == 0 {
        penalty = 1
    }
    
    effectiveLimit := baseLimit / penalty
    
    // Check current usage (implementation depends on your storage)
    currentUsage := r.getCurrentUsage(userID, action)
    
    if currentUsage >= effectiveLimit {
        return streaming.ErrRateLimitExceeded
    }
    
    return nil
}

func main() {
    app := forge.NewApp(forge.AppConfig{
        Name:    "filtered-chat",
        Version: "1.0.0",
    })

    // Create filters
    profanityFilter := NewProfanityFilter()
    rateLimiter := &AdaptiveRateLimiter{
        baseLimits: make(map[string]int),
        penalties:  make(map[string]int),
    }

    app.RegisterExtension(streaming.NewExtension(
        streaming.WithLocalBackend(),
        streaming.WithContentFilter(profanityFilter),
        streaming.WithRateLimiter(rateLimiter),
        streaming.WithMessageValidation(true),
    ))

    app.Start()
}

Best Practices

Connection Management

  • Implement heartbeat: Use ping/pong to detect dead connections
  • Handle reconnection: Implement client-side reconnection logic
  • Limit connections: Set reasonable per-user connection limits
  • Clean up resources: Properly unregister connections on disconnect

Room Design

  • Use meaningful names: Choose descriptive room names and IDs
  • Implement permissions: Use role-based access control
  • Moderate content: Implement content filtering and moderation
  • Archive old rooms: Clean up inactive rooms periodically

Message Handling

  • Validate messages: Always validate message content and structure
  • Handle errors gracefully: Provide meaningful error messages
  • Implement retries: Handle temporary failures with retry logic
  • Log important events: Log room joins, leaves, and moderation actions

Performance Optimization

  • Use message batching: Batch multiple messages when possible
  • Implement caching: Cache frequently accessed data
  • Monitor metrics: Track connection counts, message rates, and errors
  • Optimize queries: Use efficient database queries for history and presence

Security Considerations

  • Authenticate connections: Always validate user identity
  • Authorize actions: Check permissions for room/channel operations
  • Filter content: Implement content filtering and spam detection
  • Rate limit requests: Prevent abuse with rate limiting

Troubleshooting

Common Issues

Connection Refused

Error: WebSocket connection failed

Solutions:

  • Check if streaming extension is enabled
  • Verify WebSocket endpoint is registered
  • Check firewall and network connectivity
  • Ensure proper CORS configuration

Authentication Failures

Error: Authentication failed for connection

Solutions:

  • Verify token format and validity
  • Check authentication configuration
  • Ensure proper token extraction from headers/query
  • Validate token signing key

Room Join Failures

Error: Failed to join room: room not found

Solutions:

  • Verify room exists before joining
  • Check room permissions and privacy settings
  • Ensure user has permission to join
  • Check room member limits

Message Delivery Issues

Error: Message not delivered to all recipients

Solutions:

  • Check connection status of recipients
  • Verify room/channel subscriptions
  • Check message filters and permissions
  • Monitor distributed backend connectivity

Debugging

Enable Debug Logging

logging:
  level: debug
  loggers:
    streaming: debug
    streaming.manager: debug
    streaming.rooms: debug
    streaming.presence: debug

Monitor Metrics

// Check streaming statistics
manager := forge.Must[streaming.Manager](container, "streaming")
stats := manager.GetStats()

fmt.Printf("Active Connections: %d\n", stats.ActiveConnections)
fmt.Printf("Total Rooms: %d\n", stats.TotalRooms)
fmt.Printf("Total Channels: %d\n", stats.TotalChannels)
fmt.Printf("Messages/Second: %.2f\n", stats.MessagesPerSecond)

Health Checks

# Check extension health
curl http://localhost:8080/_/health/streaming

# Check backend connectivity
curl http://localhost:8080/_/health/streaming/backend

# Check distributed coordination
curl http://localhost:8080/_/health/streaming/distributed

Next Steps

How is this guide?

Last updated on