Streaming Extension
Real-time streaming with WebSocket/SSE support, rooms, channels, presence tracking, and distributed coordination
Streaming Extension
The Streaming extension provides real-time communication capabilities for your Forge application with WebSocket and Server-Sent Events (SSE) support. It includes advanced features like rooms, channels, presence tracking, typing indicators, message history, and distributed coordination for multi-node deployments.
Features
Real-Time Communication
- WebSocket Support: Full-duplex real-time communication with connection management
- Server-Sent Events (SSE): One-way streaming for real-time updates
- Connection Management: Advanced connection tracking with metadata and lifecycle management
- Message Broadcasting: Efficient message distribution with filtering and targeting
Rooms & Channels
- Chat Rooms: Organized conversations with membership, roles, and permissions
- Pub/Sub Channels: Topic-based message broadcasting with pattern matching
- Room Management: Create, join, leave, and manage room settings
- Channel Subscriptions: Subscribe to channels with custom filters
Presence & Activity
- Presence Tracking: Track user online/offline/away status across rooms
- Typing Indicators: Real-time typing indicators per room
- Activity Tracking: Monitor user activity and idle connections
- Status Broadcasting: Automatic presence updates to relevant users
Message Management
- Message History: Persist and retrieve conversation history
- Message Reactions: Add reactions and interactions to messages
- Message Editing: Edit and delete messages with audit trails
- Content Filtering: Filter messages based on content, metadata, or permissions
Distributed Architecture
- Multi-Node Support: Redis/NATS backends for horizontal scaling
- Load Balancing: Consistent hashing, least connections, and geographic routing
- State Synchronization: Sync presence, rooms, and messages across nodes
- Fault Tolerance: Automatic failover and recovery mechanisms
Security & Performance
- Authentication: Connection-level and message-level authentication
- Rate Limiting: Adaptive rate limiting with multiple algorithms
- Content Validation: Schema validation and security filtering
- Connection Limits: Per-user connection and resource limits
Installation
Go Module
go get github.com/xraph/forge/extensions/streamingDocker
FROM xraph/forge:latest
# Streaming extension is includedPackage Manager
# Using Forge CLI
forge extension add streaming
# Using package manager
npm install @xraph/forge-streamingConfiguration
YAML Configuration
extensions:
streaming:
# Backend configuration
backend: "redis" # "local", "redis", "nats"
backend_urls:
- "redis://localhost:6379"
backend_username: "" # Optional authentication
backend_password: ""
# Feature toggles
enable_rooms: true
enable_channels: true
enable_presence: true
enable_typing_indicators: true
enable_message_history: true
enable_distributed: true
# Connection limits
max_connections_per_user: 5
max_rooms_per_user: 50
max_channels_per_user: 100
max_message_size: 65536 # 64KB
max_messages_per_second: 100
# Timeouts
ping_interval: 30 # seconds
pong_timeout: 10 # seconds
write_timeout: 10 # seconds
read_timeout: 60 # seconds
# Presence settings
presence_timeout: 300 # 5 minutes
presence_cleanup: 60 # 1 minute
# Typing indicators
typing_timeout: 5 # seconds
typing_cleanup: 10 # seconds
max_typing_users_per_room: 10
# Message retention
message_retention: 720 # hours (30 days)
max_history_messages: 1000
# Distributed settings
node_id: "node-1" # Auto-generated if not set
# TLS configuration
tls_enabled: false
tls_cert_file: ""
tls_key_file: ""
tls_ca_file: ""
# Rate limiting
rate_limit:
enabled: true
algorithm: "token_bucket" # "token_bucket", "sliding_window", "adaptive"
requests_per_second: 10
burst_size: 20
# Authentication
auth:
require_auth: true
token_header: "Authorization"
validate_on_connect: true
validate_on_message: falseEnvironment Variables
# Backend Configuration
FORGE_STREAMING_BACKEND=redis
FORGE_STREAMING_BACKEND_URLS=redis://localhost:6379
FORGE_STREAMING_BACKEND_USERNAME=
FORGE_STREAMING_BACKEND_PASSWORD=
# Feature Toggles
FORGE_STREAMING_ENABLE_ROOMS=true
FORGE_STREAMING_ENABLE_CHANNELS=true
FORGE_STREAMING_ENABLE_PRESENCE=true
FORGE_STREAMING_ENABLE_TYPING_INDICATORS=true
FORGE_STREAMING_ENABLE_MESSAGE_HISTORY=true
# Connection Limits
FORGE_STREAMING_MAX_CONNECTIONS_PER_USER=5
FORGE_STREAMING_MAX_ROOMS_PER_USER=50
FORGE_STREAMING_MAX_MESSAGE_SIZE=65536
# Timeouts
FORGE_STREAMING_PING_INTERVAL=30
FORGE_STREAMING_PONG_TIMEOUT=10
FORGE_STREAMING_PRESENCE_TIMEOUT=300
# Distributed
FORGE_STREAMING_NODE_ID=node-1
FORGE_STREAMING_ENABLE_DISTRIBUTED=trueProgrammatic Configuration
package main
import (
"time"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/streaming"
)
func main() {
app := forge.NewApp(forge.AppConfig{
Name: "chat-app",
Version: "1.0.0",
})
// Configure streaming extension
app.RegisterExtension(streaming.NewExtension(
// Backend
streaming.WithRedisBackend("redis://localhost:6379"),
streaming.WithAuthentication("username", "password"),
// Features
streaming.WithFeatures(true, true, true, true, true), // rooms, channels, presence, typing, history
// Limits
streaming.WithConnectionLimits(5, 50, 100), // conns/user, rooms/user, channels/user
streaming.WithMessageLimits(64*1024, 100), // max size, max/second
// Timeouts
streaming.WithTimeouts(30*time.Second, 10*time.Second, 10*time.Second), // ping, pong, write
// Retention
streaming.WithMessageRetention(30 * 24 * time.Hour), // 30 days
// Distributed
streaming.WithNodeID("node-1"),
streaming.WithDistributed(true),
// TLS
streaming.WithTLS("cert.pem", "key.pem", "ca.pem"),
// Rate limiting
streaming.WithRateLimit("token_bucket", 10, 20), // algorithm, rps, burst
))
app.Start()
}Usage Examples
Basic WebSocket Chat
package main
import (
"context"
"net/http"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/streaming"
)
func main() {
app := forge.NewApp(forge.AppConfig{
Name: "chat-app",
Version: "1.0.0",
})
// Register streaming extension
app.RegisterExtension(streaming.NewExtension(
streaming.WithLocalBackend(),
streaming.WithFeatures(true, true, true, true, true),
))
// WebSocket endpoint
app.Router().GET("/ws", handleWebSocket)
// REST API for room management
app.Router().POST("/api/rooms", createRoom)
app.Router().GET("/api/rooms", listRooms)
app.Router().POST("/api/rooms/:id/join", joinRoom)
app.Router().POST("/api/rooms/:id/leave", leaveRoom)
app.Start()
}
func handleWebSocket(ctx forge.Context) error {
// Get streaming manager from DI container
manager := forge.Must[streaming.Manager](ctx.Container(), "streaming")
// Upgrade to WebSocket
conn, err := ctx.UpgradeWebSocket()
if err != nil {
return err
}
// Create enhanced connection with metadata
enhancedConn := streaming.NewEnhancedConnection(conn)
// Set user information (from authentication)
userID := ctx.Get("user_id").(string)
sessionID := ctx.Get("session_id").(string)
enhancedConn.SetUserID(userID)
enhancedConn.SetSessionID(sessionID)
// Register connection with manager
if err := manager.Register(enhancedConn); err != nil {
return err
}
// Handle connection lifecycle
defer func() {
manager.Unregister(enhancedConn.ID())
conn.Close()
}()
// Message handling loop
for {
var msg streaming.Message
if err := conn.ReadJSON(&msg); err != nil {
break
}
// Process message based on type
switch msg.Type {
case "join_room":
handleJoinRoom(ctx.Request().Context(), manager, enhancedConn, &msg)
case "leave_room":
handleLeaveRoom(ctx.Request().Context(), manager, enhancedConn, &msg)
case "send_message":
handleSendMessage(ctx.Request().Context(), manager, enhancedConn, &msg)
case "start_typing":
handleStartTyping(ctx.Request().Context(), manager, enhancedConn, &msg)
case "stop_typing":
handleStopTyping(ctx.Request().Context(), manager, enhancedConn, &msg)
}
}
return nil
}
func handleJoinRoom(ctx context.Context, manager streaming.Manager, conn streaming.Connection, msg *streaming.Message) {
roomID := msg.Data["room_id"].(string)
// Join room
if err := manager.JoinRoom(ctx, conn.ID(), roomID); err != nil {
conn.WriteJSON(streaming.Message{
Type: "error",
Data: map[string]any{"error": err.Error()},
})
return
}
// Set presence
manager.SetPresence(ctx, conn.GetUserID(), "online")
// Notify room members
manager.BroadcastToRoom(ctx, roomID, &streaming.Message{
Type: "user_joined",
Data: map[string]any{
"user_id": conn.GetUserID(),
"room_id": roomID,
},
})
// Send confirmation
conn.WriteJSON(streaming.Message{
Type: "room_joined",
Data: map[string]any{"room_id": roomID},
})
}
func handleSendMessage(ctx context.Context, manager streaming.Manager, conn streaming.Connection, msg *streaming.Message) {
roomID := msg.Data["room_id"].(string)
content := msg.Data["content"].(string)
// Create message
chatMsg := &streaming.Message{
Type: "message",
UserID: conn.GetUserID(),
RoomID: roomID,
Content: content,
Timestamp: time.Now(),
Data: map[string]any{
"user_id": conn.GetUserID(),
"room_id": roomID,
"content": content,
},
}
// Save to history
if err := manager.SaveMessage(ctx, chatMsg); err != nil {
conn.WriteJSON(streaming.Message{
Type: "error",
Data: map[string]any{"error": "Failed to save message"},
})
return
}
// Broadcast to room
manager.BroadcastToRoom(ctx, roomID, chatMsg)
}
func createRoom(ctx forge.Context) error {
manager := forge.Must[streaming.Manager](ctx.Container(), "streaming")
var req struct {
Name string `json:"name" validate:"required"`
Description string `json:"description"`
Private bool `json:"private"`
MaxMembers int `json:"max_members"`
}
if err := ctx.Bind(&req); err != nil {
return ctx.JSON(400, map[string]string{"error": "Invalid request"})
}
// Create room
room := streaming.NewRoom(streaming.RoomOptions{
Name: req.Name,
Description: req.Description,
Owner: ctx.Get("user_id").(string),
Private: req.Private,
MaxMembers: req.MaxMembers,
})
if err := manager.CreateRoom(ctx.Request().Context(), room); err != nil {
return ctx.JSON(500, map[string]string{"error": "Failed to create room"})
}
return ctx.JSON(201, map[string]any{
"id": room.GetID(),
"name": room.GetName(),
"description": room.GetDescription(),
"private": room.IsPrivate(),
"owner": room.GetOwner(),
"created": room.GetCreated(),
})
}Server-Sent Events (SSE)
func handleSSE(ctx forge.Context) error {
manager := forge.Must[streaming.Manager](ctx.Container(), "streaming")
// Set SSE headers
ctx.Header("Content-Type", "text/event-stream")
ctx.Header("Cache-Control", "no-cache")
ctx.Header("Connection", "keep-alive")
ctx.Header("Access-Control-Allow-Origin", "*")
// Create SSE connection
sseConn := streaming.NewSSEConnection(ctx.Response())
enhancedConn := streaming.NewEnhancedConnection(sseConn)
userID := ctx.Get("user_id").(string)
enhancedConn.SetUserID(userID)
// Register connection
if err := manager.Register(enhancedConn); err != nil {
return err
}
defer manager.Unregister(enhancedConn.ID())
// Subscribe to user-specific channel
userChannel := fmt.Sprintf("user.%s.notifications", userID)
manager.Subscribe(ctx.Request().Context(), enhancedConn.ID(), userChannel, nil)
// Keep connection alive
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Request().Context().Done():
return nil
case <-ticker.C:
// Send heartbeat
sseConn.WriteEvent("heartbeat", map[string]any{
"timestamp": time.Now().Unix(),
})
}
}
}Advanced Room Management
package main
import (
"context"
"time"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/streaming"
)
type ChatService struct {
manager streaming.Manager
logger forge.Logger
}
func NewChatService(manager streaming.Manager, logger forge.Logger) *ChatService {
return &ChatService{
manager: manager,
logger: logger,
}
}
func (s *ChatService) CreatePrivateRoom(ctx context.Context, ownerID string, memberIDs []string, name string) (*streaming.Room, error) {
// Create private room
room := streaming.NewRoom(streaming.RoomOptions{
Name: name,
Owner: ownerID,
Private: true,
MaxMembers: len(memberIDs) + 1, // Owner + members
})
if err := s.manager.CreateRoom(ctx, room); err != nil {
return nil, err
}
// Add owner as admin
room.Join(ctx, ownerID, streaming.RoleAdmin)
// Add members
for _, memberID := range memberIDs {
if err := room.Join(ctx, memberID, streaming.RoleMember); err != nil {
s.logger.Error("failed to add member to room",
forge.F("room_id", room.GetID()),
forge.F("member_id", memberID),
forge.F("error", err),
)
}
}
// Send invitations
for _, memberID := range memberIDs {
s.sendRoomInvitation(ctx, room, memberID)
}
return &room, nil
}
func (s *ChatService) sendRoomInvitation(ctx context.Context, room streaming.Room, userID string) {
invitation := &streaming.Message{
Type: "room_invitation",
UserID: "system",
Data: map[string]any{
"room_id": room.GetID(),
"room_name": room.GetName(),
"invited_by": room.GetOwner(),
"timestamp": time.Now(),
},
}
s.manager.SendToUser(ctx, userID, invitation)
}
func (s *ChatService) HandleModerationAction(ctx context.Context, roomID, moderatorID, targetID, action, reason string) error {
// Verify moderator permissions
room, err := s.manager.GetRoom(ctx, roomID)
if err != nil {
return err
}
hasPermission, err := room.HasPermission(ctx, moderatorID, streaming.PermissionManageRoom)
if err != nil || !hasPermission {
return streaming.ErrInsufficientPermissions
}
// Perform moderation action
switch action {
case "ban":
return s.banMember(ctx, room, moderatorID, targetID, reason)
case "kick":
return s.kickMember(ctx, room, moderatorID, targetID, reason)
case "mute":
return s.muteMember(ctx, room, moderatorID, targetID, reason, 24*time.Hour)
default:
return streaming.ErrInvalidModerationAction
}
}
func (s *ChatService) banMember(ctx context.Context, room streaming.Room, moderatorID, targetID, reason string) error {
// Ban member
if err := room.BanMember(ctx, targetID, reason, nil); err != nil {
return err
}
// Remove from room
if err := room.Leave(ctx, targetID); err != nil {
s.logger.Error("failed to remove banned member from room",
forge.F("room_id", room.GetID()),
forge.F("target_id", targetID),
forge.F("error", err),
)
}
// Disconnect user from room
userConns := s.manager.GetUserConnections(targetID)
for _, conn := range userConns {
if conn.IsInRoom(room.GetID()) {
s.manager.LeaveRoom(ctx, conn.ID(), room.GetID())
}
}
// Broadcast moderation event
s.manager.BroadcastToRoom(ctx, room.GetID(), &streaming.Message{
Type: "member_banned",
Data: map[string]any{
"room_id": room.GetID(),
"target_id": targetID,
"moderator_id": moderatorID,
"reason": reason,
"timestamp": time.Now(),
},
})
return nil
}
func (s *ChatService) GetRoomHistory(ctx context.Context, roomID, userID string, limit int, before *time.Time) ([]*streaming.Message, error) {
// Verify user is member of room
room, err := s.manager.GetRoom(ctx, roomID)
if err != nil {
return nil, err
}
isMember, err := room.IsMember(ctx, userID)
if err != nil || !isMember {
return nil, streaming.ErrNotRoomMember
}
// Get message history
query := streaming.HistoryQuery{
RoomID: roomID,
Limit: limit,
Before: before,
}
return s.manager.GetHistory(ctx, roomID, query)
}Channel Subscriptions with Filters
func handleChannelSubscription(ctx forge.Context) error {
manager := forge.Must[streaming.Manager](ctx.Container(), "streaming")
var req struct {
ChannelPattern string `json:"channel_pattern"`
Filters map[string]any `json:"filters"`
}
if err := ctx.Bind(&req); err != nil {
return ctx.JSON(400, map[string]string{"error": "Invalid request"})
}
// Get connection from context (set by WebSocket middleware)
conn := ctx.Get("websocket_connection").(streaming.Connection)
// Subscribe to channels matching pattern
channels, err := manager.FindChannelsByPattern(ctx.Request().Context(), req.ChannelPattern)
if err != nil {
return ctx.JSON(500, map[string]string{"error": "Failed to find channels"})
}
for _, channel := range channels {
// Subscribe with filters
err := manager.Subscribe(ctx.Request().Context(), conn.ID(), channel.GetID(), req.Filters)
if err != nil {
return ctx.JSON(500, map[string]string{"error": "Failed to subscribe"})
}
}
return ctx.JSON(200, map[string]any{
"subscribed_channels": len(channels),
"pattern": req.ChannelPattern,
"filters": req.Filters,
})
}
// Example: Subscribe to user-specific notifications
func subscribeToUserNotifications(ctx context.Context, manager streaming.Manager, conn streaming.Connection, userID string) error {
// Subscribe to various user channels
channels := []string{
fmt.Sprintf("user.%s.messages", userID),
fmt.Sprintf("user.%s.notifications", userID),
fmt.Sprintf("user.%s.presence", userID),
}
for _, channelID := range channels {
// Create channel if it doesn't exist
channel := streaming.NewChannel(streaming.ChannelOptions{
ID: channelID,
Name: channelID,
Private: true,
})
manager.CreateChannel(ctx, channel)
// Subscribe with user-specific filters
filters := map[string]any{
"user_id": userID,
"priority": []string{"high", "medium"}, // Only high/medium priority messages
}
if err := manager.Subscribe(ctx, conn.ID(), channelID, filters); err != nil {
return err
}
}
return nil
}Advanced Features
Distributed Deployment
package main
import (
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/streaming"
)
func main() {
app := forge.NewApp(forge.AppConfig{
Name: "chat-cluster",
Version: "1.0.0",
})
// Configure for distributed deployment
app.RegisterExtension(streaming.NewExtension(
// Redis backend for state sharing
streaming.WithRedisBackend(
"redis://redis-1:6379",
"redis://redis-2:6379",
"redis://redis-3:6379",
),
// Node identification
streaming.WithNodeID("chat-node-1"),
streaming.WithDistributed(true),
// Load balancing
streaming.WithLoadBalancer("consistent_hash"), // "round_robin", "least_connections", "geographic"
// State synchronization
streaming.WithPresenceSync(true),
streaming.WithRoomStateSync(true),
streaming.WithSyncInterval(30 * time.Second),
// Fault tolerance
streaming.WithHealthCheck(true),
streaming.WithAutoFailover(true),
streaming.WithReconnectAttempts(5),
))
app.Start()
}Custom Authentication
package main
import (
"context"
"strings"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/streaming"
)
// Custom authenticator
type JWTAuthenticator struct {
secretKey []byte
}
func (a *JWTAuthenticator) AuthenticateConnection(ctx context.Context, token string) (*streaming.AuthResult, error) {
// Validate JWT token
claims, err := validateJWT(token, a.secretKey)
if err != nil {
return nil, streaming.ErrInvalidToken
}
return &streaming.AuthResult{
UserID: claims.UserID,
SessionID: claims.SessionID,
Permissions: claims.Permissions,
Metadata: map[string]any{
"role": claims.Role,
"expires_at": claims.ExpiresAt,
},
}, nil
}
func (a *JWTAuthenticator) AuthorizeMessage(ctx context.Context, userID string, message *streaming.Message) error {
// Check if user can send message to room/channel
switch message.Type {
case "send_message":
return a.authorizeRoomMessage(ctx, userID, message)
case "join_room":
return a.authorizeRoomJoin(ctx, userID, message)
default:
return nil
}
}
func (a *JWTAuthenticator) authorizeRoomMessage(ctx context.Context, userID string, message *streaming.Message) error {
roomID := message.Data["room_id"].(string)
// Check if user is member of room
// Implementation depends on your room store
return nil
}
// Register custom authenticator
func main() {
app := forge.NewApp(forge.AppConfig{
Name: "secure-chat",
Version: "1.0.0",
})
authenticator := &JWTAuthenticator{
secretKey: []byte("your-secret-key"),
}
app.RegisterExtension(streaming.NewExtension(
streaming.WithLocalBackend(),
streaming.WithCustomAuth(authenticator),
streaming.WithAuthRequired(true),
))
// WebSocket endpoint with authentication
app.Router().GET("/ws", func(ctx forge.Context) error {
// Extract token from query parameter or header
token := ctx.Query("token")
if token == "" {
token = strings.TrimPrefix(ctx.Header("Authorization"), "Bearer ")
}
if token == "" {
return ctx.JSON(401, map[string]string{"error": "Missing token"})
}
// Authenticate
authResult, err := authenticator.AuthenticateConnection(ctx.Request().Context(), token)
if err != nil {
return ctx.JSON(401, map[string]string{"error": "Invalid token"})
}
// Store auth info in context
ctx.Set("user_id", authResult.UserID)
ctx.Set("session_id", authResult.SessionID)
ctx.Set("permissions", authResult.Permissions)
return handleWebSocket(ctx)
})
app.Start()
}Rate Limiting & Content Filtering
package main
import (
"context"
"regexp"
"strings"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/streaming"
)
// Custom content filter
type ProfanityFilter struct {
bannedWords []string
patterns []*regexp.Regexp
}
func NewProfanityFilter() *ProfanityFilter {
bannedWords := []string{"spam", "abuse", "inappropriate"}
patterns := make([]*regexp.Regexp, len(bannedWords))
for i, word := range bannedWords {
patterns[i] = regexp.MustCompile(`(?i)\b` + regexp.QuoteMeta(word) + `\b`)
}
return &ProfanityFilter{
bannedWords: bannedWords,
patterns: patterns,
}
}
func (f *ProfanityFilter) FilterMessage(ctx context.Context, message *streaming.Message) (*streaming.Message, error) {
content, ok := message.Data["content"].(string)
if !ok {
return message, nil
}
// Check for banned words
for _, pattern := range f.patterns {
if pattern.MatchString(content) {
return nil, streaming.ErrMessageFiltered
}
}
// Clean up content (example: remove excessive whitespace)
cleaned := strings.TrimSpace(regexp.MustCompile(`\s+`).ReplaceAllString(content, " "))
message.Data["content"] = cleaned
return message, nil
}
// Custom rate limiter
type AdaptiveRateLimiter struct {
baseLimits map[string]int // user_id -> base limit
penalties map[string]int // user_id -> penalty multiplier
}
func (r *AdaptiveRateLimiter) CheckLimit(ctx context.Context, userID string, action string) error {
baseLimit := r.baseLimits[userID]
if baseLimit == 0 {
baseLimit = 10 // Default limit
}
penalty := r.penalties[userID]
if penalty == 0 {
penalty = 1
}
effectiveLimit := baseLimit / penalty
// Check current usage (implementation depends on your storage)
currentUsage := r.getCurrentUsage(userID, action)
if currentUsage >= effectiveLimit {
return streaming.ErrRateLimitExceeded
}
return nil
}
func main() {
app := forge.NewApp(forge.AppConfig{
Name: "filtered-chat",
Version: "1.0.0",
})
// Create filters
profanityFilter := NewProfanityFilter()
rateLimiter := &AdaptiveRateLimiter{
baseLimits: make(map[string]int),
penalties: make(map[string]int),
}
app.RegisterExtension(streaming.NewExtension(
streaming.WithLocalBackend(),
streaming.WithContentFilter(profanityFilter),
streaming.WithRateLimiter(rateLimiter),
streaming.WithMessageValidation(true),
))
app.Start()
}Best Practices
Connection Management
- Implement heartbeat: Use ping/pong to detect dead connections
- Handle reconnection: Implement client-side reconnection logic
- Limit connections: Set reasonable per-user connection limits
- Clean up resources: Properly unregister connections on disconnect
Room Design
- Use meaningful names: Choose descriptive room names and IDs
- Implement permissions: Use role-based access control
- Moderate content: Implement content filtering and moderation
- Archive old rooms: Clean up inactive rooms periodically
Message Handling
- Validate messages: Always validate message content and structure
- Handle errors gracefully: Provide meaningful error messages
- Implement retries: Handle temporary failures with retry logic
- Log important events: Log room joins, leaves, and moderation actions
Performance Optimization
- Use message batching: Batch multiple messages when possible
- Implement caching: Cache frequently accessed data
- Monitor metrics: Track connection counts, message rates, and errors
- Optimize queries: Use efficient database queries for history and presence
Security Considerations
- Authenticate connections: Always validate user identity
- Authorize actions: Check permissions for room/channel operations
- Filter content: Implement content filtering and spam detection
- Rate limit requests: Prevent abuse with rate limiting
Troubleshooting
Common Issues
Connection Refused
Error: WebSocket connection failedSolutions:
- Check if streaming extension is enabled
- Verify WebSocket endpoint is registered
- Check firewall and network connectivity
- Ensure proper CORS configuration
Authentication Failures
Error: Authentication failed for connectionSolutions:
- Verify token format and validity
- Check authentication configuration
- Ensure proper token extraction from headers/query
- Validate token signing key
Room Join Failures
Error: Failed to join room: room not foundSolutions:
- Verify room exists before joining
- Check room permissions and privacy settings
- Ensure user has permission to join
- Check room member limits
Message Delivery Issues
Error: Message not delivered to all recipientsSolutions:
- Check connection status of recipients
- Verify room/channel subscriptions
- Check message filters and permissions
- Monitor distributed backend connectivity
Debugging
Enable Debug Logging
logging:
level: debug
loggers:
streaming: debug
streaming.manager: debug
streaming.rooms: debug
streaming.presence: debugMonitor Metrics
// Check streaming statistics
manager := forge.Must[streaming.Manager](container, "streaming")
stats := manager.GetStats()
fmt.Printf("Active Connections: %d\n", stats.ActiveConnections)
fmt.Printf("Total Rooms: %d\n", stats.TotalRooms)
fmt.Printf("Total Channels: %d\n", stats.TotalChannels)
fmt.Printf("Messages/Second: %.2f\n", stats.MessagesPerSecond)Health Checks
# Check extension health
curl http://localhost:8080/_/health/streaming
# Check backend connectivity
curl http://localhost:8080/_/health/streaming/backend
# Check distributed coordination
curl http://localhost:8080/_/health/streaming/distributedNext Steps
- Explore WebRTC Extension for peer-to-peer communication
- Learn about Events Extension for event-driven architecture
- Check out gRPC Extension for high-performance RPC
- Review GraphQL Extension for flexible query APIs
- See oRPC Extension for JSON-RPC 2.0 support
- Visit Auth Extension for authentication integration
How is this guide?
Last updated on