Streaming
Real-time streaming capabilities with WebSocket, Server-Sent Events, and WebTransport support
Streaming
Forge provides comprehensive streaming capabilities for real-time applications including WebSocket connections, Server-Sent Events (SSE), and WebTransport support. These features enable bidirectional communication, real-time updates, and low-latency data streaming.
Streaming Interface
Forge's streaming interface provides a unified API for different streaming protocols:
type StreamingManager interface {
// WebSocket support
WebSocket(path string, handler WebSocketHandler, opts ...WebSocketOption) error
// Server-Sent Events
SSE(path string, handler SSEHandler, opts ...SSEOption) error
// WebTransport support
WebTransport(path string, handler WebTransportHandler, opts ...WebTransportOption) error
// Connection management
GetConnection(id string) (Connection, error)
Broadcast(message interface{}, filter ...ConnectionFilter) error
// Statistics
GetStats() StreamingStats
}WebSocket Support
Basic WebSocket Handler
func websocketHandler(conn forge.WebSocketConnection) error {
// Send welcome message
conn.WriteMessage(forge.TextMessage, []byte("Connected to WebSocket"))
// Handle incoming messages
for {
messageType, data, err := conn.ReadMessage()
if err != nil {
return err
}
// Echo back the message
err = conn.WriteMessage(messageType, data)
if err != nil {
return err
}
}
}
// Register WebSocket endpoint
app.Router().WebSocket("/ws", websocketHandler)WebSocket with Authentication
func authenticatedWebSocketHandler(conn forge.WebSocketConnection) error {
// Get user from connection context
userID := conn.Get("user_id").(string)
// Send personalized welcome
conn.WriteMessage(forge.TextMessage, []byte(fmt.Sprintf("Welcome, user %s", userID)))
// Handle messages
for {
messageType, data, err := conn.ReadMessage()
if err != nil {
return err
}
// Process message with user context
response := processMessage(userID, data)
conn.WriteMessage(messageType, response)
}
}
// Register with authentication middleware
app.Router().WebSocket("/ws/authenticated", authenticatedWebSocketHandler,
forge.WithWebSocketAuth("jwt"),
)WebSocket Connection Management
type ChatRoom struct {
connections map[string]forge.WebSocketConnection
mutex sync.RWMutex
}
func (cr *ChatRoom) AddConnection(userID string, conn forge.WebSocketConnection) {
cr.mutex.Lock()
defer cr.mutex.Unlock()
cr.connections[userID] = conn
}
func (cr *ChatRoom) RemoveConnection(userID string) {
cr.mutex.Lock()
defer cr.mutex.Unlock()
delete(cr.connections, userID)
}
func (cr *ChatRoom) Broadcast(message []byte, excludeUserID string) {
cr.mutex.RLock()
defer cr.mutex.RUnlock()
for userID, conn := range cr.connections {
if userID != excludeUserID {
conn.WriteMessage(forge.TextMessage, message)
}
}
}
func chatHandler(conn forge.WebSocketConnection) error {
userID := conn.Get("user_id").(string)
chatRoom.AddConnection(userID, conn)
defer chatRoom.RemoveConnection(userID)
for {
messageType, data, err := conn.ReadMessage()
if err != nil {
return err
}
// Broadcast message to all other users
chatRoom.Broadcast(data, userID)
}
}Server-Sent Events (SSE)
Basic SSE Handler
func sseHandler(ctx forge.Context) error {
// Set SSE headers
ctx.SetHeader("Content-Type", "text/event-stream")
ctx.SetHeader("Cache-Control", "no-cache")
ctx.SetHeader("Connection", "keep-alive")
ctx.SetHeader("Access-Control-Allow-Origin", "*")
// Send initial event
ctx.WriteSSE("connected", "Welcome to SSE stream")
// Send periodic updates
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
data := map[string]interface{}{
"timestamp": time.Now().Unix(),
"message": "Periodic update",
}
ctx.WriteSSE("update", data)
}
}
}
// Register SSE endpoint
app.Router().SSE("/events", sseHandler)SSE with Real-time Data
func realTimeDataHandler(ctx forge.Context) error {
// Set SSE headers
ctx.SetHeader("Content-Type", "text/event-stream")
ctx.SetHeader("Cache-Control", "no-cache")
ctx.SetHeader("Connection", "keep-alive")
// Send initial data
ctx.WriteSSE("init", map[string]interface{}{
"status": "connected",
"time": time.Now().Unix(),
})
// Subscribe to data updates
dataChannel := make(chan interface{})
dataService.Subscribe(dataChannel)
defer dataService.Unsubscribe(dataChannel)
for {
select {
case <-ctx.Done():
return nil
case data := <-dataChannel:
ctx.WriteSSE("data", data)
}
}
}SSE with Custom Events
func customEventHandler(ctx forge.Context) error {
// Set SSE headers
ctx.SetHeader("Content-Type", "text/event-stream")
ctx.SetHeader("Cache-Control", "no-cache")
ctx.SetHeader("Connection", "keep-alive")
// Send different types of events
ctx.WriteSSE("user-login", map[string]interface{}{
"user_id": "123",
"timestamp": time.Now().Unix(),
})
ctx.WriteSSE("system-alert", map[string]interface{}{
"level": "warning",
"message": "High CPU usage detected",
})
ctx.WriteSSE("data-update", map[string]interface{}{
"table": "users",
"action": "insert",
"id": "456",
})
return nil
}WebTransport Support
Basic WebTransport Handler
func webTransportHandler(session forge.WebTransportSession) error {
// Send welcome message
session.SendMessage([]byte("Connected to WebTransport"))
// Handle incoming messages
for {
data, err := session.ReceiveMessage()
if err != nil {
return err
}
// Process and respond
response := processMessage(data)
session.SendMessage(response)
}
}
// Register WebTransport endpoint
app.Router().WebTransport("/wt", webTransportHandler)WebTransport with Bidirectional Streams
func bidirectionalStreamHandler(session forge.WebTransportSession) error {
// Create bidirectional stream
stream, err := session.CreateBidirectionalStream()
if err != nil {
return err
}
// Handle stream data
go func() {
for {
data, err := stream.Read()
if err != nil {
return
}
// Process data and send response
response := processStreamData(data)
stream.Write(response)
}
}()
return nil
}Streaming Middleware
Connection Logging Middleware
func streamingLoggingMiddleware(next forge.StreamingHandler) forge.StreamingHandler {
return func(conn forge.StreamingConnection) error {
start := time.Now()
app.Logger().Info("streaming connection started",
forge.F("protocol", conn.Protocol()),
forge.F("remote_addr", conn.RemoteAddr()),
)
err := next(conn)
duration := time.Since(start)
app.Logger().Info("streaming connection ended",
forge.F("protocol", conn.Protocol()),
forge.F("duration", duration),
forge.F("error", err),
)
return err
}
}Rate Limiting Middleware
func streamingRateLimitMiddleware(next forge.StreamingHandler) forge.StreamingHandler {
return func(conn forge.StreamingConnection) error {
// Check rate limit
if !rateLimiter.Allow(conn.RemoteAddr()) {
conn.WriteMessage(forge.TextMessage, []byte("Rate limit exceeded"))
return forge.TooManyRequests("rate limit exceeded")
}
return next(conn)
}
}Streaming Configuration
WebSocket Configuration
app := forge.NewApp(forge.AppConfig{
StreamingConfig: forge.StreamingConfig{
WebSocket: forge.WebSocketConfig{
Enabled: true,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: true,
HandshakeTimeout: 10 * time.Second,
PingPeriod: 54 * time.Second,
PongWait: 60 * time.Second,
MaxMessageSize: 512,
MaxConnections: 1000,
},
},
})SSE Configuration
app := forge.NewApp(forge.AppConfig{
StreamingConfig: forge.StreamingConfig{
SSE: forge.SSEConfig{
Enabled: true,
RetryInterval: 3 * time.Second,
MaxConnections: 1000,
BufferSize: 1024,
},
},
})WebTransport Configuration
app := forge.NewApp(forge.AppConfig{
StreamingConfig: forge.StreamingConfig{
WebTransport: forge.WebTransportConfig{
Enabled: true,
MaxConnections: 1000,
MaxStreams: 100,
BufferSize: 1024,
},
},
})Real-time Applications
Chat Application
type ChatApplication struct {
rooms map[string]*ChatRoom
mutex sync.RWMutex
}
func (ca *ChatApplication) JoinRoom(roomID string, userID string, conn forge.WebSocketConnection) {
ca.mutex.Lock()
defer ca.mutex.Unlock()
room, exists := ca.rooms[roomID]
if !exists {
room = &ChatRoom{
ID: roomID,
connections: make(map[string]forge.WebSocketConnection),
}
ca.rooms[roomID] = room
}
room.AddConnection(userID, conn)
}
func chatRoomHandler(conn forge.WebSocketConnection) error {
roomID := conn.Get("room_id").(string)
userID := conn.Get("user_id").(string)
chatApp.JoinRoom(roomID, userID, conn)
defer chatApp.LeaveRoom(roomID, userID)
for {
messageType, data, err := conn.ReadMessage()
if err != nil {
return err
}
// Broadcast to room
chatApp.BroadcastToRoom(roomID, data, userID)
}
}Live Data Dashboard
func liveDashboardHandler(ctx forge.Context) error {
// Set SSE headers
ctx.SetHeader("Content-Type", "text/event-stream")
ctx.SetHeader("Cache-Control", "no-cache")
ctx.SetHeader("Connection", "keep-alive")
// Send initial dashboard data
dashboardData := getDashboardData()
ctx.WriteSSE("dashboard-init", dashboardData)
// Subscribe to data updates
updateChannel := make(chan interface{})
dataService.Subscribe(updateChannel)
defer dataService.Unsubscribe(updateChannel)
for {
select {
case <-ctx.Done():
return nil
case update := <-updateChannel:
ctx.WriteSSE("dashboard-update", update)
}
}
}Real-time Notifications
func notificationHandler(ctx forge.Context) error {
userID := ctx.Get("user_id").(string)
// Set SSE headers
ctx.SetHeader("Content-Type", "text/event-stream")
ctx.SetHeader("Cache-Control", "no-cache")
ctx.SetHeader("Connection", "keep-alive")
// Subscribe to user notifications
notificationChannel := make(chan interface{})
notificationService.Subscribe(userID, notificationChannel)
defer notificationService.Unsubscribe(userID, notificationChannel)
for {
select {
case <-ctx.Done():
return nil
case notification := <-notificationChannel:
ctx.WriteSSE("notification", notification)
}
}
}Performance Optimization
Connection Pooling
type ConnectionPool struct {
connections map[string]forge.StreamingConnection
mutex sync.RWMutex
maxSize int
}
func (cp *ConnectionPool) AddConnection(id string, conn forge.StreamingConnection) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if len(cp.connections) >= cp.maxSize {
return errors.New("connection pool full")
}
cp.connections[id] = conn
return nil
}Message Batching
func batchedMessageHandler(conn forge.WebSocketConnection) error {
batch := make([][]byte, 0, 10)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if len(batch) > 0 {
// Send batched messages
for _, message := range batch {
conn.WriteMessage(forge.TextMessage, message)
}
batch = batch[:0]
}
default:
messageType, data, err := conn.ReadMessage()
if err != nil {
return err
}
// Add to batch
batch = append(batch, data)
}
}
}Testing Streaming
WebSocket Testing
func TestWebSocket(t *testing.T) {
app := forge.NewTestApp(forge.TestAppConfig{
Name: "test-app",
})
// Register WebSocket handler
app.Router().WebSocket("/ws", func(conn forge.WebSocketConnection) error {
conn.WriteMessage(forge.TextMessage, []byte("hello"))
return nil
})
// Test WebSocket connection
resp := app.Test().WebSocket("/ws").Expect(t)
resp.ReceiveMessage().Equal("hello")
}SSE Testing
func TestSSE(t *testing.T) {
app := forge.NewTestApp(forge.TestAppConfig{
Name: "test-app",
})
// Register SSE handler
app.Router().SSE("/events", func(ctx forge.Context) error {
ctx.WriteSSE("test", "data")
return nil
})
// Test SSE endpoint
resp := app.Test().GET("/events").Expect(t)
resp.Status(200)
resp.Header("Content-Type").Equal("text/event-stream")
}Best Practices
- Connection Management: Properly manage connection lifecycle
- Error Handling: Handle connection errors gracefully
- Rate Limiting: Implement rate limiting for streaming endpoints
- Authentication: Secure streaming endpoints with authentication
- Resource Cleanup: Clean up resources when connections close
- Monitoring: Monitor streaming connection metrics
- Testing: Test streaming functionality thoroughly
- Performance: Optimize for high concurrency
For more advanced streaming features, see the Streaming Extension documentation.
How is this guide?
Last updated on