Streaming Overview

Real-time communication with WebSocket, SSE, and WebTransport

Forge provides first-class support for three real-time streaming protocols: WebSocket, Server-Sent Events (SSE), and WebTransport. All three are registered as routes on the Forge router and benefit from the same middleware, error handling, and schema generation as regular HTTP endpoints.

Protocol Comparison

FeatureWebSocketSSEWebTransport
DirectionFull-duplex (bidirectional)Server-to-client onlyFull-duplex (bidirectional)
TransportTCP over HTTP/1.1 upgradeHTTP/1.1 or HTTP/2QUIC (HTTP/3)
LatencyLowMediumVery low
ReconnectionManualBuilt-in (automatic)Manual
Binary dataYesNo (text only)Yes
MultiplexingNo (one connection = one stream)NoYes (multiple streams per connection)
Browser supportAll modern browsersAll modern browsersChrome, Edge (expanding)
Proxy-friendlySometimes problematicExcellentRequires HTTP/3

When to Use Which

StreamConfig

All streaming protocols share a common StreamConfig that controls buffer sizes, compression, and protocol-specific settings.

type StreamConfig struct {
    // WebSocket
    ReadBufferSize    int   // Default: 4096
    WriteBufferSize   int   // Default: 4096
    EnableCompression bool  // Default: false

    // SSE
    RetryInterval int  // Milliseconds, default: 3000
    KeepAlive     bool // Default: true

    // WebTransport
    EnableWebTransport      bool    // Default: false
    MaxBidiStreams           int64   // Default: 100
    MaxUniStreams            int64   // Default: 100
    MaxDatagramFrameSize    int64   // Default: 65536 (64KB)
    EnableDatagrams         bool    // Default: true
    StreamReceiveWindow     uint64  // Default: 6MB
    ConnectionReceiveWindow uint64  // Default: 15MB
    WebTransportKeepAlive   int     // Milliseconds, default: 30000
    WebTransportMaxIdle     int     // Milliseconds, default: 60000
}

Default Configuration

config := forge.DefaultStreamConfig()
// Returns sensible defaults for all protocols

Quick Start

r := app.Router()

r.WebSocket("/ws/echo", func(ctx forge.Context, conn forge.Connection) error {
    for {
        msg, err := conn.Read()
        if err != nil {
            return err
        }
        if err := conn.Write(msg); err != nil {
            return err
        }
    }
})
r := app.Router()

r.SSE("/events/time", func(ctx forge.Context, stream forge.Stream) error {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-stream.Context().Done():
            return nil
        case t := <-ticker.C:
            stream.SendJSON("tick", map[string]string{
                "time": t.Format(time.RFC3339),
            })
        }
    }
})
r := app.Router()

r.WebTransport("/wt/data", func(ctx forge.Context, session forge.WebTransportSession) error {
    stream, err := session.AcceptStream(ctx.Request().Context())
    if err != nil {
        return err
    }
    defer stream.Close()

    buf := make([]byte, 1024)
    n, err := stream.Read(buf)
    if err != nil {
        return err
    }

    _, err = stream.Write(buf[:n])
    return err
})

AsyncAPI Integration

All streaming endpoints are automatically included in the AsyncAPI 3.0 specification served at /_/asyncapi. Add message schemas to improve the generated documentation:

r.WebSocket("/ws/chat", chatHandler,
    forge.WithWebSocketMessages(&ChatMessage{}, &ServerEvent{}),
    forge.WithSummary("Real-time chat"),
    forge.WithTags("chat"),
)

r.SSE("/events/feed", feedHandler,
    forge.WithSSEMessage("update", &FeedItem{}),
    forge.WithSummary("Live activity feed"),
    forge.WithTags("feed"),
)

See the AsyncAPI documentation for full details.

Next Steps

How is this guide?

On this page