Streaming
Streaming
Real-time WebSocket and SSE with rooms, channels, presence, and typing indicators
Overview
github.com/xraph/forge/extensions/streaming provides a real-time communication layer with WebSocket
and Server-Sent Events (SSE) support. It registers a Manager in the DI container that handles
connections, rooms, channels, presence tracking, typing indicators, and message history with pluggable
backends (local, Redis, NATS).
What It Registers
| Service | DI Key | Type |
|---|---|---|
| Streaming manager | streaming | Manager |
The manager is registered as a Vessel-managed singleton. The extension provides a RegisterRoutes helper to mount WebSocket and SSE endpoints on the Forge HTTP router.
Quick Start
package main
import (
"context"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/streaming"
)
func main() {
app := forge.NewApp(forge.AppConfig{Name: "chat-app", Version: "1.0.0"})
ext := streaming.NewExtension(
streaming.WithLocalBackend(),
streaming.WithFeatures(true, true, true, true, true), // rooms, channels, presence, typing, history
streaming.WithConnectionLimits(5, 50, 50), // 5 conns/user, 50 rooms/user, 50 channels/user
)
app.RegisterExtension(ext)
// Mount WebSocket and SSE endpoints
ext.RegisterRoutes(app.Router(), "/ws", "/sse")
ctx := context.Background()
app.Start(ctx)
defer app.Stop(ctx)
mgr := streaming.MustGetManager(app.Container())
// Create a room
room, _ := mgr.CreateRoom(ctx, streaming.RoomOptions{
Name: "general",
MaxMembers: 100,
})
// Broadcast to all connected users
mgr.Broadcast(ctx, []byte(`{"type":"announcement","data":"Server is ready"}`))
// Broadcast to a specific room
mgr.BroadcastToRoom(ctx, room.ID(), []byte(`{"type":"message","data":"Hello room!"}`))
}Using Streaming in Your Services
Inject the Manager for automatic DI resolution:
type ChatService struct {
streaming streaming.Manager
logger forge.Logger
}
func NewChatService(mgr streaming.Manager, logger forge.Logger) *ChatService {
return &ChatService{streaming: mgr, logger: logger}
}
func (s *ChatService) SendMessage(ctx context.Context, roomID string, userID string, msg []byte) error {
// Save to history
s.streaming.SaveMessage(ctx, &streaming.Message{
RoomID: roomID,
UserID: userID,
Data: msg,
})
// Broadcast to room
return s.streaming.BroadcastToRoom(ctx, roomID, msg)
}
func (s *ChatService) GetOnlineUsers(ctx context.Context) ([]string, error) {
return s.streaming.GetOnlineUsers(ctx)
}Register with Vessel:
forge.ProvideConstructor(app.Container(), NewChatService)Room Management
mgr := streaming.MustGetManager(app.Container())
// Create a room
room, _ := mgr.CreateRoom(ctx, streaming.RoomOptions{
Name: "project-alpha",
MaxMembers: 50,
})
// Join/leave
mgr.JoinRoom(ctx, room.ID(), userID, streaming.MemberOptions{Role: "member"})
mgr.LeaveRoom(ctx, room.ID(), userID)
// Get room members
members, _ := mgr.GetRoomMembers(ctx, room.ID())
// Transfer ownership
mgr.TransferRoomOwnership(ctx, room.ID(), newOwnerID)Presence and Typing
// Set user presence
mgr.SetPresence(ctx, userID, streaming.PresenceOptions{
Status: "online",
Device: &streaming.DeviceInfo{Type: "desktop"},
})
// Get online users
online, _ := mgr.GetOnlineUsers(ctx)
// Typing indicators
mgr.StartTyping(ctx, roomID, userID)
mgr.StopTyping(ctx, roomID, userID)
typing, _ := mgr.GetTypingUsers(ctx, roomID)Key Concepts
- Connections -- WebSocket and SSE connections are wrapped in an
EnhancedConnectionthat tracks user ID, session ID, metadata, joined rooms, and subscribed channels. - Rooms -- users join rooms to exchange messages. Rooms support members, roles, invites, bans, and moderation events.
- Channels -- pub/sub channels for topic-based messaging. Users subscribe to channels to receive broadcasts.
- Presence -- track user online status, device info, and activity across connections with configurable timeout and cleanup intervals.
- Typing indicators -- real-time typing status per user per room with automatic expiration.
- Message history -- persist messages with configurable retention (default 30 days) and search/query support.
- Distributed mode -- scale across nodes with Redis or NATS backends for cross-node message routing and presence synchronization.
Important Runtime Notes
- Call
RegisterRoutes(router, wsPath, ssePath)to mount WebSocket and SSE endpoints. - The local backend stores everything in memory. Use Redis or NATS backends for production multi-node deployments.
- Connection limits, message rate limits, and message size limits are enforced per-user.
Detailed Pages
How is this guide?