Server-Sent Events
Server-to-client streaming over HTTP
Server-Sent Events (SSE) provide a simple, efficient protocol for streaming data from server to client over a standard HTTP connection. SSE is built on HTTP/1.1, works through all proxies and load balancers, and includes automatic reconnection.
Basic Usage
Register an SSE route using router.SSE(). The handler receives the request context and a Stream for sending events.
r := app.Router()
r.SSE("/events/time", func(ctx forge.Context, stream forge.Stream) error {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-stream.Context().Done():
return nil // Client disconnected
case t := <-ticker.C:
err := stream.SendJSON("tick", map[string]string{
"time": t.Format(time.RFC3339),
})
if err != nil {
return err
}
}
}
})SSEHandler Signature
type SSEHandler func(ctx forge.Context, stream forge.Stream) errorThe handler runs for the lifetime of the SSE connection. When it returns, the connection closes.
EventStream
router.EventStream() is an alias for router.SSE() that reads more naturally for certain use cases.
r.EventStream("/stream/updates", func(ctx forge.Context, stream forge.Stream) error {
// Same interface as SSE
return stream.SendJSON("update", data)
})Stream Interface
The Stream interface provides methods for sending events to the client.
type Stream interface {
// Send sends a named event with raw bytes
Send(event string, data []byte) error
// SendJSON sends a named event with a JSON-encoded payload
SendJSON(event string, v any) error
// Flush flushes any buffered data to the client
Flush() error
// Close closes the stream
Close() error
// Context returns the stream context (cancelled on disconnect)
Context() context.Context
// SetRetry sets the client reconnection interval in milliseconds
SetRetry(milliseconds int) error
// SendComment sends an SSE comment (useful for keep-alive)
SendComment(comment string) error
}Sending Events
r.SSE("/events/users", func(ctx forge.Context, stream forge.Stream) error {
user := User{ID: "1", Name: "Alice"}
// Sends: event: user_created\ndata: {"id":"1","name":"Alice"}\n\n
return stream.SendJSON("user_created", user)
})r.SSE("/events/raw", func(ctx forge.Context, stream forge.Stream) error {
data := []byte(`{"status": "ok"}`)
// Sends: event: status\ndata: {"status": "ok"}\n\n
return stream.Send("status", data)
})r.SSE("/events/multi", func(ctx forge.Context, stream forge.Stream) error {
// Different event types on the same stream
stream.SendJSON("notification", Notification{
Title: "New message",
})
stream.SendJSON("metric", Metric{
Name: "cpu_usage",
Value: 42.5,
})
stream.SendJSON("alert", Alert{
Level: "warning",
Message: "Memory usage high",
})
return nil
})Keep-Alive with Comments
SSE comments (lines starting with :) are ignored by clients but keep the connection alive through proxies and load balancers.
r.SSE("/events/feed", func(ctx forge.Context, stream forge.Stream) error {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-stream.Context().Done():
return nil
case <-ticker.C:
// Send a comment to keep the connection alive
if err := stream.SendComment("keepalive"); err != nil {
return err
}
case item := <-feedChannel:
if err := stream.SendJSON("item", item); err != nil {
return err
}
}
}
})Retry Interval
Control how long the client waits before reconnecting after a disconnect.
r.SSE("/events/data", func(ctx forge.Context, stream forge.Stream) error {
// Tell the client to reconnect after 5 seconds if disconnected
if err := stream.SetRetry(5000); err != nil {
return err
}
// Stream data...
for {
select {
case <-stream.Context().Done():
return nil
case data := <-dataChannel:
stream.SendJSON("data", data)
}
}
})The default retry interval is configured in StreamConfig.RetryInterval (default: 3000ms). The SetRetry method overrides it for a specific stream.
Live Dashboard Example
A complete example streaming system metrics to a dashboard.
package main
import (
"runtime"
"time"
"github.com/xraph/forge"
)
type DashboardMetrics struct {
Goroutines int `json:"goroutines"`
HeapAlloc uint64 `json:"heap_alloc_bytes"`
HeapObjects uint64 `json:"heap_objects"`
GCPauses uint32 `json:"gc_pauses"`
Uptime string `json:"uptime"`
Timestamp string `json:"timestamp"`
}
func main() {
app := forge.New(forge.WithAppName("dashboard"))
r := app.Router()
startTime := time.Now()
r.SSE("/events/dashboard", func(ctx forge.Context, stream forge.Stream) error {
// Set reconnection interval
stream.SetRetry(2000)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
keepAlive := time.NewTicker(15 * time.Second)
defer keepAlive.Stop()
for {
select {
case <-stream.Context().Done():
return nil
case <-keepAlive.C:
stream.SendComment("ping")
case <-ticker.C:
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
metrics := DashboardMetrics{
Goroutines: runtime.NumGoroutine(),
HeapAlloc: mem.HeapAlloc,
HeapObjects: mem.HeapObjects,
GCPauses: mem.NumGC,
Uptime: time.Since(startTime).String(),
Timestamp: time.Now().Format(time.RFC3339),
}
if err := stream.SendJSON("metrics", metrics); err != nil {
return err
}
}
}
},
forge.WithSummary("Dashboard metrics stream"),
forge.WithTags("monitoring"),
forge.WithSSEMessage("metrics", &DashboardMetrics{}),
)
app.Run()
}Client-Side JavaScript
const evtSource = new EventSource('/events/dashboard');
evtSource.addEventListener('metrics', (event) => {
const metrics = JSON.parse(event.data);
updateDashboard(metrics);
});
evtSource.onerror = () => {
console.log('Connection lost, reconnecting...');
// Browser automatically reconnects using the retry interval
};POST SSE with Method Override
By default, SSE routes use GET. Override to POST when clients need to send a request body.
r.SSE("/events/search", searchStreamHandler,
forge.WithMethod(http.MethodPost),
forge.WithRequestSchema(&SearchRequest{}),
forge.WithSSEMessage("result", &SearchResult{}),
forge.WithTags("search"),
)Route Options
SSE routes support standard route options.
r.SSE("/events/protected", handler,
forge.WithName("protectedSSE"),
forge.WithSummary("Protected event stream"),
forge.WithTags("events"),
forge.WithMiddleware(authMiddleware),
forge.WithAuth("jwt"),
forge.WithSSEMessages(map[string]any{
"notification": &Notification{},
"alert": &Alert{},
}),
)SSE Headers
Forge automatically sets the correct SSE headers on the response:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: noThe X-Accel-Buffering: no header disables nginx buffering, ensuring events are delivered immediately.
How is this guide?