Server-Sent Events

Server-to-client streaming over HTTP

Server-Sent Events (SSE) provide a simple, efficient protocol for streaming data from server to client over a standard HTTP connection. SSE is built on HTTP/1.1, works through all proxies and load balancers, and includes automatic reconnection.

Basic Usage

Register an SSE route using router.SSE(). The handler receives the request context and a Stream for sending events.

r := app.Router()

r.SSE("/events/time", func(ctx forge.Context, stream forge.Stream) error {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-stream.Context().Done():
            return nil // Client disconnected
        case t := <-ticker.C:
            err := stream.SendJSON("tick", map[string]string{
                "time": t.Format(time.RFC3339),
            })
            if err != nil {
                return err
            }
        }
    }
})

SSEHandler Signature

type SSEHandler func(ctx forge.Context, stream forge.Stream) error

The handler runs for the lifetime of the SSE connection. When it returns, the connection closes.

EventStream

router.EventStream() is an alias for router.SSE() that reads more naturally for certain use cases.

r.EventStream("/stream/updates", func(ctx forge.Context, stream forge.Stream) error {
    // Same interface as SSE
    return stream.SendJSON("update", data)
})

Stream Interface

The Stream interface provides methods for sending events to the client.

type Stream interface {
    // Send sends a named event with raw bytes
    Send(event string, data []byte) error

    // SendJSON sends a named event with a JSON-encoded payload
    SendJSON(event string, v any) error

    // Flush flushes any buffered data to the client
    Flush() error

    // Close closes the stream
    Close() error

    // Context returns the stream context (cancelled on disconnect)
    Context() context.Context

    // SetRetry sets the client reconnection interval in milliseconds
    SetRetry(milliseconds int) error

    // SendComment sends an SSE comment (useful for keep-alive)
    SendComment(comment string) error
}

Sending Events

r.SSE("/events/users", func(ctx forge.Context, stream forge.Stream) error {
    user := User{ID: "1", Name: "Alice"}

    // Sends: event: user_created\ndata: {"id":"1","name":"Alice"}\n\n
    return stream.SendJSON("user_created", user)
})
r.SSE("/events/raw", func(ctx forge.Context, stream forge.Stream) error {
    data := []byte(`{"status": "ok"}`)

    // Sends: event: status\ndata: {"status": "ok"}\n\n
    return stream.Send("status", data)
})
r.SSE("/events/multi", func(ctx forge.Context, stream forge.Stream) error {
    // Different event types on the same stream
    stream.SendJSON("notification", Notification{
        Title: "New message",
    })

    stream.SendJSON("metric", Metric{
        Name:  "cpu_usage",
        Value: 42.5,
    })

    stream.SendJSON("alert", Alert{
        Level:   "warning",
        Message: "Memory usage high",
    })

    return nil
})

Keep-Alive with Comments

SSE comments (lines starting with :) are ignored by clients but keep the connection alive through proxies and load balancers.

r.SSE("/events/feed", func(ctx forge.Context, stream forge.Stream) error {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-stream.Context().Done():
            return nil
        case <-ticker.C:
            // Send a comment to keep the connection alive
            if err := stream.SendComment("keepalive"); err != nil {
                return err
            }
        case item := <-feedChannel:
            if err := stream.SendJSON("item", item); err != nil {
                return err
            }
        }
    }
})

Retry Interval

Control how long the client waits before reconnecting after a disconnect.

r.SSE("/events/data", func(ctx forge.Context, stream forge.Stream) error {
    // Tell the client to reconnect after 5 seconds if disconnected
    if err := stream.SetRetry(5000); err != nil {
        return err
    }

    // Stream data...
    for {
        select {
        case <-stream.Context().Done():
            return nil
        case data := <-dataChannel:
            stream.SendJSON("data", data)
        }
    }
})

The default retry interval is configured in StreamConfig.RetryInterval (default: 3000ms). The SetRetry method overrides it for a specific stream.

Live Dashboard Example

A complete example streaming system metrics to a dashboard.

package main

import (
    "runtime"
    "time"

    "github.com/xraph/forge"
)

type DashboardMetrics struct {
    Goroutines  int     `json:"goroutines"`
    HeapAlloc   uint64  `json:"heap_alloc_bytes"`
    HeapObjects uint64  `json:"heap_objects"`
    GCPauses    uint32  `json:"gc_pauses"`
    Uptime      string  `json:"uptime"`
    Timestamp   string  `json:"timestamp"`
}

func main() {
    app := forge.New(forge.WithAppName("dashboard"))
    r := app.Router()

    startTime := time.Now()

    r.SSE("/events/dashboard", func(ctx forge.Context, stream forge.Stream) error {
        // Set reconnection interval
        stream.SetRetry(2000)

        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()

        keepAlive := time.NewTicker(15 * time.Second)
        defer keepAlive.Stop()

        for {
            select {
            case <-stream.Context().Done():
                return nil

            case <-keepAlive.C:
                stream.SendComment("ping")

            case <-ticker.C:
                var mem runtime.MemStats
                runtime.ReadMemStats(&mem)

                metrics := DashboardMetrics{
                    Goroutines:  runtime.NumGoroutine(),
                    HeapAlloc:   mem.HeapAlloc,
                    HeapObjects: mem.HeapObjects,
                    GCPauses:    mem.NumGC,
                    Uptime:      time.Since(startTime).String(),
                    Timestamp:   time.Now().Format(time.RFC3339),
                }

                if err := stream.SendJSON("metrics", metrics); err != nil {
                    return err
                }
            }
        }
    },
        forge.WithSummary("Dashboard metrics stream"),
        forge.WithTags("monitoring"),
        forge.WithSSEMessage("metrics", &DashboardMetrics{}),
    )

    app.Run()
}

Client-Side JavaScript

const evtSource = new EventSource('/events/dashboard');

evtSource.addEventListener('metrics', (event) => {
    const metrics = JSON.parse(event.data);
    updateDashboard(metrics);
});

evtSource.onerror = () => {
    console.log('Connection lost, reconnecting...');
    // Browser automatically reconnects using the retry interval
};

POST SSE with Method Override

By default, SSE routes use GET. Override to POST when clients need to send a request body.

r.SSE("/events/search", searchStreamHandler,
    forge.WithMethod(http.MethodPost),
    forge.WithRequestSchema(&SearchRequest{}),
    forge.WithSSEMessage("result", &SearchResult{}),
    forge.WithTags("search"),
)

Route Options

SSE routes support standard route options.

r.SSE("/events/protected", handler,
    forge.WithName("protectedSSE"),
    forge.WithSummary("Protected event stream"),
    forge.WithTags("events"),
    forge.WithMiddleware(authMiddleware),
    forge.WithAuth("jwt"),
    forge.WithSSEMessages(map[string]any{
        "notification": &Notification{},
        "alert":        &Alert{},
    }),
)

SSE Headers

Forge automatically sets the correct SSE headers on the response:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no

The X-Accel-Buffering: no header disables nginx buffering, ensuring events are delivered immediately.

How is this guide?

On this page