Streaming

Real-time streaming capabilities with WebSocket, Server-Sent Events, and WebTransport support

Streaming

Forge provides comprehensive streaming capabilities for real-time applications including WebSocket connections, Server-Sent Events (SSE), and WebTransport support. These features enable bidirectional communication, real-time updates, and low-latency data streaming.

Streaming Interface

Forge's streaming interface provides a unified API for different streaming protocols:

type StreamingManager interface {
    // WebSocket support
    WebSocket(path string, handler WebSocketHandler, opts ...WebSocketOption) error
    
    // Server-Sent Events
    SSE(path string, handler SSEHandler, opts ...SSEOption) error
    
    // WebTransport support
    WebTransport(path string, handler WebTransportHandler, opts ...WebTransportOption) error
    
    // Connection management
    GetConnection(id string) (Connection, error)
    Broadcast(message interface{}, filter ...ConnectionFilter) error
    
    // Statistics
    GetStats() StreamingStats
}

WebSocket Support

Basic WebSocket Handler

func websocketHandler(conn forge.WebSocketConnection) error {
    // Send welcome message
    conn.WriteMessage(forge.TextMessage, []byte("Connected to WebSocket"))
    
    // Handle incoming messages
    for {
        messageType, data, err := conn.ReadMessage()
        if err != nil {
            return err
        }
        
        // Echo back the message
        err = conn.WriteMessage(messageType, data)
        if err != nil {
            return err
        }
    }
}

// Register WebSocket endpoint
app.Router().WebSocket("/ws", websocketHandler)

WebSocket with Authentication

func authenticatedWebSocketHandler(conn forge.WebSocketConnection) error {
    // Get user from connection context
    userID := conn.Get("user_id").(string)
    
    // Send personalized welcome
    conn.WriteMessage(forge.TextMessage, []byte(fmt.Sprintf("Welcome, user %s", userID)))
    
    // Handle messages
    for {
        messageType, data, err := conn.ReadMessage()
        if err != nil {
            return err
        }
        
        // Process message with user context
        response := processMessage(userID, data)
        conn.WriteMessage(messageType, response)
    }
}

// Register with authentication middleware
app.Router().WebSocket("/ws/authenticated", authenticatedWebSocketHandler,
    forge.WithWebSocketAuth("jwt"),
)

WebSocket Connection Management

type ChatRoom struct {
    connections map[string]forge.WebSocketConnection
    mutex       sync.RWMutex
}

func (cr *ChatRoom) AddConnection(userID string, conn forge.WebSocketConnection) {
    cr.mutex.Lock()
    defer cr.mutex.Unlock()
    cr.connections[userID] = conn
}

func (cr *ChatRoom) RemoveConnection(userID string) {
    cr.mutex.Lock()
    defer cr.mutex.Unlock()
    delete(cr.connections, userID)
}

func (cr *ChatRoom) Broadcast(message []byte, excludeUserID string) {
    cr.mutex.RLock()
    defer cr.mutex.RUnlock()
    
    for userID, conn := range cr.connections {
        if userID != excludeUserID {
            conn.WriteMessage(forge.TextMessage, message)
        }
    }
}

func chatHandler(conn forge.WebSocketConnection) error {
    userID := conn.Get("user_id").(string)
    chatRoom.AddConnection(userID, conn)
    defer chatRoom.RemoveConnection(userID)
    
    for {
        messageType, data, err := conn.ReadMessage()
        if err != nil {
            return err
        }
        
        // Broadcast message to all other users
        chatRoom.Broadcast(data, userID)
    }
}

Server-Sent Events (SSE)

Basic SSE Handler

func sseHandler(ctx forge.Context) error {
    // Set SSE headers
    ctx.SetHeader("Content-Type", "text/event-stream")
    ctx.SetHeader("Cache-Control", "no-cache")
    ctx.SetHeader("Connection", "keep-alive")
    ctx.SetHeader("Access-Control-Allow-Origin", "*")
    
    // Send initial event
    ctx.WriteSSE("connected", "Welcome to SSE stream")
    
    // Send periodic updates
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return nil
        case <-ticker.C:
            data := map[string]interface{}{
                "timestamp": time.Now().Unix(),
                "message":   "Periodic update",
            }
            ctx.WriteSSE("update", data)
        }
    }
}

// Register SSE endpoint
app.Router().SSE("/events", sseHandler)

SSE with Real-time Data

func realTimeDataHandler(ctx forge.Context) error {
    // Set SSE headers
    ctx.SetHeader("Content-Type", "text/event-stream")
    ctx.SetHeader("Cache-Control", "no-cache")
    ctx.SetHeader("Connection", "keep-alive")
    
    // Send initial data
    ctx.WriteSSE("init", map[string]interface{}{
        "status": "connected",
        "time":   time.Now().Unix(),
    })
    
    // Subscribe to data updates
    dataChannel := make(chan interface{})
    dataService.Subscribe(dataChannel)
    defer dataService.Unsubscribe(dataChannel)
    
    for {
        select {
        case <-ctx.Done():
            return nil
        case data := <-dataChannel:
            ctx.WriteSSE("data", data)
        }
    }
}

SSE with Custom Events

func customEventHandler(ctx forge.Context) error {
    // Set SSE headers
    ctx.SetHeader("Content-Type", "text/event-stream")
    ctx.SetHeader("Cache-Control", "no-cache")
    ctx.SetHeader("Connection", "keep-alive")
    
    // Send different types of events
    ctx.WriteSSE("user-login", map[string]interface{}{
        "user_id": "123",
        "timestamp": time.Now().Unix(),
    })
    
    ctx.WriteSSE("system-alert", map[string]interface{}{
        "level": "warning",
        "message": "High CPU usage detected",
    })
    
    ctx.WriteSSE("data-update", map[string]interface{}{
        "table": "users",
        "action": "insert",
        "id": "456",
    })
    
    return nil
}

WebTransport Support

Basic WebTransport Handler

func webTransportHandler(session forge.WebTransportSession) error {
    // Send welcome message
    session.SendMessage([]byte("Connected to WebTransport"))
    
    // Handle incoming messages
    for {
        data, err := session.ReceiveMessage()
        if err != nil {
            return err
        }
        
        // Process and respond
        response := processMessage(data)
        session.SendMessage(response)
    }
}

// Register WebTransport endpoint
app.Router().WebTransport("/wt", webTransportHandler)

WebTransport with Bidirectional Streams

func bidirectionalStreamHandler(session forge.WebTransportSession) error {
    // Create bidirectional stream
    stream, err := session.CreateBidirectionalStream()
    if err != nil {
        return err
    }
    
    // Handle stream data
    go func() {
        for {
            data, err := stream.Read()
            if err != nil {
                return
            }
            
            // Process data and send response
            response := processStreamData(data)
            stream.Write(response)
        }
    }()
    
    return nil
}

Streaming Middleware

Connection Logging Middleware

func streamingLoggingMiddleware(next forge.StreamingHandler) forge.StreamingHandler {
    return func(conn forge.StreamingConnection) error {
        start := time.Now()
        
        app.Logger().Info("streaming connection started",
            forge.F("protocol", conn.Protocol()),
            forge.F("remote_addr", conn.RemoteAddr()),
        )
        
        err := next(conn)
        
        duration := time.Since(start)
        app.Logger().Info("streaming connection ended",
            forge.F("protocol", conn.Protocol()),
            forge.F("duration", duration),
            forge.F("error", err),
        )
        
        return err
    }
}

Rate Limiting Middleware

func streamingRateLimitMiddleware(next forge.StreamingHandler) forge.StreamingHandler {
    return func(conn forge.StreamingConnection) error {
        // Check rate limit
        if !rateLimiter.Allow(conn.RemoteAddr()) {
            conn.WriteMessage(forge.TextMessage, []byte("Rate limit exceeded"))
            return forge.TooManyRequests("rate limit exceeded")
        }
        
        return next(conn)
    }
}

Streaming Configuration

WebSocket Configuration

app := forge.NewApp(forge.AppConfig{
    StreamingConfig: forge.StreamingConfig{
        WebSocket: forge.WebSocketConfig{
            Enabled:           true,
            ReadBufferSize:   1024,
            WriteBufferSize:  1024,
            CheckOrigin:      true,
            HandshakeTimeout: 10 * time.Second,
            PingPeriod:       54 * time.Second,
            PongWait:         60 * time.Second,
            MaxMessageSize:   512,
            MaxConnections:   1000,
        },
    },
})

SSE Configuration

app := forge.NewApp(forge.AppConfig{
    StreamingConfig: forge.StreamingConfig{
        SSE: forge.SSEConfig{
            Enabled:        true,
            RetryInterval:  3 * time.Second,
            MaxConnections: 1000,
            BufferSize:     1024,
        },
    },
})

WebTransport Configuration

app := forge.NewApp(forge.AppConfig{
    StreamingConfig: forge.StreamingConfig{
        WebTransport: forge.WebTransportConfig{
            Enabled:        true,
            MaxConnections: 1000,
            MaxStreams:     100,
            BufferSize:     1024,
        },
    },
})

Real-time Applications

Chat Application

type ChatApplication struct {
    rooms map[string]*ChatRoom
    mutex sync.RWMutex
}

func (ca *ChatApplication) JoinRoom(roomID string, userID string, conn forge.WebSocketConnection) {
    ca.mutex.Lock()
    defer ca.mutex.Unlock()
    
    room, exists := ca.rooms[roomID]
    if !exists {
        room = &ChatRoom{
            ID:          roomID,
            connections: make(map[string]forge.WebSocketConnection),
        }
        ca.rooms[roomID] = room
    }
    
    room.AddConnection(userID, conn)
}

func chatRoomHandler(conn forge.WebSocketConnection) error {
    roomID := conn.Get("room_id").(string)
    userID := conn.Get("user_id").(string)
    
    chatApp.JoinRoom(roomID, userID, conn)
    defer chatApp.LeaveRoom(roomID, userID)
    
    for {
        messageType, data, err := conn.ReadMessage()
        if err != nil {
            return err
        }
        
        // Broadcast to room
        chatApp.BroadcastToRoom(roomID, data, userID)
    }
}

Live Data Dashboard

func liveDashboardHandler(ctx forge.Context) error {
    // Set SSE headers
    ctx.SetHeader("Content-Type", "text/event-stream")
    ctx.SetHeader("Cache-Control", "no-cache")
    ctx.SetHeader("Connection", "keep-alive")
    
    // Send initial dashboard data
    dashboardData := getDashboardData()
    ctx.WriteSSE("dashboard-init", dashboardData)
    
    // Subscribe to data updates
    updateChannel := make(chan interface{})
    dataService.Subscribe(updateChannel)
    defer dataService.Unsubscribe(updateChannel)
    
    for {
        select {
        case <-ctx.Done():
            return nil
        case update := <-updateChannel:
            ctx.WriteSSE("dashboard-update", update)
        }
    }
}

Real-time Notifications

func notificationHandler(ctx forge.Context) error {
    userID := ctx.Get("user_id").(string)
    
    // Set SSE headers
    ctx.SetHeader("Content-Type", "text/event-stream")
    ctx.SetHeader("Cache-Control", "no-cache")
    ctx.SetHeader("Connection", "keep-alive")
    
    // Subscribe to user notifications
    notificationChannel := make(chan interface{})
    notificationService.Subscribe(userID, notificationChannel)
    defer notificationService.Unsubscribe(userID, notificationChannel)
    
    for {
        select {
        case <-ctx.Done():
            return nil
        case notification := <-notificationChannel:
            ctx.WriteSSE("notification", notification)
        }
    }
}

Performance Optimization

Connection Pooling

type ConnectionPool struct {
    connections map[string]forge.StreamingConnection
    mutex       sync.RWMutex
    maxSize     int
}

func (cp *ConnectionPool) AddConnection(id string, conn forge.StreamingConnection) error {
    cp.mutex.Lock()
    defer cp.mutex.Unlock()
    
    if len(cp.connections) >= cp.maxSize {
        return errors.New("connection pool full")
    }
    
    cp.connections[id] = conn
    return nil
}

Message Batching

func batchedMessageHandler(conn forge.WebSocketConnection) error {
    batch := make([][]byte, 0, 10)
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            if len(batch) > 0 {
                // Send batched messages
                for _, message := range batch {
                    conn.WriteMessage(forge.TextMessage, message)
                }
                batch = batch[:0]
            }
        default:
            messageType, data, err := conn.ReadMessage()
            if err != nil {
                return err
            }
            
            // Add to batch
            batch = append(batch, data)
        }
    }
}

Testing Streaming

WebSocket Testing

func TestWebSocket(t *testing.T) {
    app := forge.NewTestApp(forge.TestAppConfig{
        Name: "test-app",
    })
    
    // Register WebSocket handler
    app.Router().WebSocket("/ws", func(conn forge.WebSocketConnection) error {
        conn.WriteMessage(forge.TextMessage, []byte("hello"))
        return nil
    })
    
    // Test WebSocket connection
    resp := app.Test().WebSocket("/ws").Expect(t)
    resp.ReceiveMessage().Equal("hello")
}

SSE Testing

func TestSSE(t *testing.T) {
    app := forge.NewTestApp(forge.TestAppConfig{
        Name: "test-app",
    })
    
    // Register SSE handler
    app.Router().SSE("/events", func(ctx forge.Context) error {
        ctx.WriteSSE("test", "data")
        return nil
    })
    
    // Test SSE endpoint
    resp := app.Test().GET("/events").Expect(t)
    resp.Status(200)
    resp.Header("Content-Type").Equal("text/event-stream")
}

Best Practices

  1. Connection Management: Properly manage connection lifecycle
  2. Error Handling: Handle connection errors gracefully
  3. Rate Limiting: Implement rate limiting for streaming endpoints
  4. Authentication: Secure streaming endpoints with authentication
  5. Resource Cleanup: Clean up resources when connections close
  6. Monitoring: Monitor streaming connection metrics
  7. Testing: Test streaming functionality thoroughly
  8. Performance: Optimize for high concurrency

For more advanced streaming features, see the Streaming Extension documentation.

How is this guide?

Last updated on