Streaming

Streaming

Real-time WebSocket and SSE with rooms, channels, presence, and typing indicators

Overview

github.com/xraph/forge/extensions/streaming provides a real-time communication layer with WebSocket and Server-Sent Events (SSE) support. It registers a Manager in the DI container that handles connections, rooms, channels, presence tracking, typing indicators, and message history with pluggable backends (local, Redis, NATS).

What It Registers

ServiceDI KeyType
Streaming managerstreamingManager

The manager is registered as a Vessel-managed singleton. The extension provides a RegisterRoutes helper to mount WebSocket and SSE endpoints on the Forge HTTP router.

Quick Start

package main

import (
    "context"

    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
)

func main() {
    app := forge.NewApp(forge.AppConfig{Name: "chat-app", Version: "1.0.0"})

    ext := streaming.NewExtension(
        streaming.WithLocalBackend(),
        streaming.WithFeatures(true, true, true, true, true), // rooms, channels, presence, typing, history
        streaming.WithConnectionLimits(5, 50, 50),            // 5 conns/user, 50 rooms/user, 50 channels/user
    )
    app.RegisterExtension(ext)

    // Mount WebSocket and SSE endpoints
    ext.RegisterRoutes(app.Router(), "/ws", "/sse")

    ctx := context.Background()
    app.Start(ctx)
    defer app.Stop(ctx)

    mgr := streaming.MustGetManager(app.Container())

    // Create a room
    room, _ := mgr.CreateRoom(ctx, streaming.RoomOptions{
        Name:       "general",
        MaxMembers: 100,
    })

    // Broadcast to all connected users
    mgr.Broadcast(ctx, []byte(`{"type":"announcement","data":"Server is ready"}`))

    // Broadcast to a specific room
    mgr.BroadcastToRoom(ctx, room.ID(), []byte(`{"type":"message","data":"Hello room!"}`))
}

Using Streaming in Your Services

Inject the Manager for automatic DI resolution:

type ChatService struct {
    streaming streaming.Manager
    logger    forge.Logger
}

func NewChatService(mgr streaming.Manager, logger forge.Logger) *ChatService {
    return &ChatService{streaming: mgr, logger: logger}
}

func (s *ChatService) SendMessage(ctx context.Context, roomID string, userID string, msg []byte) error {
    // Save to history
    s.streaming.SaveMessage(ctx, &streaming.Message{
        RoomID: roomID,
        UserID: userID,
        Data:   msg,
    })
    // Broadcast to room
    return s.streaming.BroadcastToRoom(ctx, roomID, msg)
}

func (s *ChatService) GetOnlineUsers(ctx context.Context) ([]string, error) {
    return s.streaming.GetOnlineUsers(ctx)
}

Register with Vessel:

forge.ProvideConstructor(app.Container(), NewChatService)

Room Management

mgr := streaming.MustGetManager(app.Container())

// Create a room
room, _ := mgr.CreateRoom(ctx, streaming.RoomOptions{
    Name:       "project-alpha",
    MaxMembers: 50,
})

// Join/leave
mgr.JoinRoom(ctx, room.ID(), userID, streaming.MemberOptions{Role: "member"})
mgr.LeaveRoom(ctx, room.ID(), userID)

// Get room members
members, _ := mgr.GetRoomMembers(ctx, room.ID())

// Transfer ownership
mgr.TransferRoomOwnership(ctx, room.ID(), newOwnerID)

Presence and Typing

// Set user presence
mgr.SetPresence(ctx, userID, streaming.PresenceOptions{
    Status: "online",
    Device: &streaming.DeviceInfo{Type: "desktop"},
})

// Get online users
online, _ := mgr.GetOnlineUsers(ctx)

// Typing indicators
mgr.StartTyping(ctx, roomID, userID)
mgr.StopTyping(ctx, roomID, userID)
typing, _ := mgr.GetTypingUsers(ctx, roomID)

Key Concepts

  • Connections -- WebSocket and SSE connections are wrapped in an EnhancedConnection that tracks user ID, session ID, metadata, joined rooms, and subscribed channels.
  • Rooms -- users join rooms to exchange messages. Rooms support members, roles, invites, bans, and moderation events.
  • Channels -- pub/sub channels for topic-based messaging. Users subscribe to channels to receive broadcasts.
  • Presence -- track user online status, device info, and activity across connections with configurable timeout and cleanup intervals.
  • Typing indicators -- real-time typing status per user per room with automatic expiration.
  • Message history -- persist messages with configurable retention (default 30 days) and search/query support.
  • Distributed mode -- scale across nodes with Redis or NATS backends for cross-node message routing and presence synchronization.

Important Runtime Notes

  • Call RegisterRoutes(router, wsPath, ssePath) to mount WebSocket and SSE endpoints.
  • The local backend stores everything in memory. Use Redis or NATS backends for production multi-node deployments.
  • Connection limits, message rate limits, and message size limits are enforced per-user.

Detailed Pages

How is this guide?

On this page