Streaming

Features

Streaming extension capabilities

Manager Interface

The core Manager interface handles connections, rooms, channels, broadcasting, presence, and typing:

type Manager interface {
    // Connections
    Register(ctx context.Context, conn Connection) error
    Unregister(ctx context.Context, conn Connection) error
    GetConnection(connID string) (Connection, error)
    GetUserConnections(userID string) []Connection
    ConnectionCount() int
    KickConnection(ctx context.Context, connID string, reason string) error

    // Rooms
    CreateRoom(ctx context.Context, opts RoomOptions) (Room, error)
    GetRoom(ctx context.Context, roomID string) (Room, error)
    DeleteRoom(ctx context.Context, roomID string) error
    JoinRoom(ctx context.Context, roomID, userID string, opts MemberOptions) error
    LeaveRoom(ctx context.Context, roomID, userID string) error
    GetRoomMembers(ctx context.Context, roomID string) ([]Member, error)
    ListRooms(ctx context.Context) ([]Room, error)

    // Channels
    CreateChannel(ctx context.Context, name string) error
    Subscribe(ctx context.Context, channelID, userID string) error
    Unsubscribe(ctx context.Context, channelID, userID string) error

    // Broadcasting
    Broadcast(ctx context.Context, data []byte) error
    BroadcastToRoom(ctx context.Context, roomID string, data []byte) error
    BroadcastToChannel(ctx context.Context, channelID string, data []byte) error
    SendToUser(ctx context.Context, userID string, data []byte) error
    SendToConnection(ctx context.Context, connID string, data []byte) error

    // Presence and Typing
    SetPresence(ctx context.Context, userID string, opts PresenceOptions) error
    GetPresence(ctx context.Context, userID string) (*PresenceEvent, error)
    GetOnlineUsers(ctx context.Context) ([]string, error)
    StartTyping(ctx context.Context, roomID, userID string) error
    StopTyping(ctx context.Context, roomID, userID string) error
    GetTypingUsers(ctx context.Context, roomID string) ([]string, error)

    // History
    SaveMessage(ctx context.Context, msg *Message) error
    GetHistory(ctx context.Context, query HistoryQuery) ([]*Message, error)

    // Lifecycle
    Start(ctx context.Context) error
    Stop(ctx context.Context) error
    Health(ctx context.Context) error
}

WebSocket and SSE Transport

Dual transport support:

  • WebSocket -- bidirectional communication. Clients send and receive JSON messages with type, room_id, channel_id, user_id, and data fields.
  • SSE (Server-Sent Events) -- server-to-client push for environments where WebSocket is not available.

Message types: MessageTypeMessage, MessageTypeJoin, MessageTypeLeave, MessageTypeTyping, MessageTypePresence.

Room Management

Create and manage chat rooms with rich options:

room, _ := mgr.CreateRoom(ctx, streaming.RoomOptions{
    Name:       "project-alpha",
    MaxMembers: 100,
})

// Join with role
mgr.JoinRoom(ctx, room.ID(), userID, streaming.MemberOptions{Role: "moderator"})

// Room lifecycle
mgr.ArchiveRoom(ctx, roomID)
mgr.RestoreRoom(ctx, roomID)
mgr.TransferRoomOwnership(ctx, roomID, newOwnerID)
mgr.BulkJoinRoom(ctx, roomID, []string{user1, user2, user3})

Room roles: owner, admin, moderator, member. Each role has different permissions for moderation, invites, and room management.

Rooms also support: invites with expiration, bans, moderation events, and search by name or metadata.

Channel Pub/Sub

Subscribe to named channels for topic-based broadcasting:

mgr.CreateChannel(ctx, "notifications")
mgr.Subscribe(ctx, "notifications", userID)
mgr.BroadcastToChannel(ctx, "notifications", []byte(`{"type":"alert","message":"Deploy complete"}`))
mgr.Unsubscribe(ctx, "notifications", userID)

Presence Tracking

Track user online/offline status across connections with configurable timeout and cleanup:

mgr.SetPresence(ctx, userID, streaming.PresenceOptions{
    Status:       "online",
    Device:       &streaming.DeviceInfo{Type: "desktop", Name: "Chrome"},
    Activity:     &streaming.ActivityInfo{Type: "browsing", Detail: "Dashboard"},
    Availability: "available",
})

presence, _ := mgr.GetPresence(ctx, userID)
online, _ := mgr.GetOnlineUsers(ctx)
mgr.TrackActivity(ctx, userID) // refresh last-seen

Presence timeout defaults to 5 minutes. Cleanup runs periodically to remove stale entries.

Typing Indicators

Real-time typing status per user per room with automatic expiration:

mgr.StartTyping(ctx, roomID, userID)  // auto-expires after 3 seconds
mgr.StopTyping(ctx, roomID, userID)   // explicit stop
typing, _ := mgr.GetTypingUsers(ctx, roomID)

Configurable: TypingTimeout (default 3s), MaxTypingUsersPerRoom.

Message History

Persist messages with configurable retention and query support:

// Save
mgr.SaveMessage(ctx, &streaming.Message{
    RoomID: roomID,
    UserID: userID,
    Data:   []byte(`{"text":"Hello!"}`),
})

// Query
messages, _ := mgr.GetHistory(ctx, streaming.HistoryQuery{
    RoomID: roomID,
    Limit:  50,
    Before: time.Now(),
})

Messages support reactions, edits, and metadata attachments. Retention defaults to 30 days with configurable cleanup.

Connection Management

  • Per-user connection limits (default: 5)
  • Session ID and metadata tracking
  • Automatic activity tracking
  • Idle connection cleanup
conns := mgr.GetUserConnections(userID)
mgr.KickConnection(ctx, connID, "session expired")
idle := mgr.GetIdleConnections(ctx, 30*time.Minute)
mgr.CleanupIdleConnections(ctx, 30*time.Minute)

Rate Limiting

  • Per-user message rate limits (default: 100 messages/second)
  • Max message size (default: 64 KB)
  • Returns ErrMessageTooLarge when exceeded

Distributed Backends

Scale across nodes with Redis or NATS backends:

streaming.NewExtension(
    streaming.WithRedisBackend("redis://localhost:6379"),
    streaming.WithFeatures(true, true, true, true, true),
)
BackendTransportBest For
localIn-process mapsSingle instance, development
redisRedis Pub/SubProduction, multi-node
natsNATS messagingHigh-throughput, multi-node

Distributed mode provides: cross-node message routing, presence synchronization, distributed locking, node heartbeats, and failover.

Sentinel Errors

ErrorMeaning
ErrRoomNotFoundRoom does not exist
ErrRoomAlreadyExistsRoom name collision
ErrRoomFullRoom has reached max members
ErrNotRoomMemberUser is not a member of the room
ErrAlreadyRoomMemberUser is already in the room
ErrRoomLimitReachedUser has joined too many rooms
ErrConnectionNotFoundConnection ID not found
ErrConnectionClosedConnection is closed
ErrConnectionLimitReachedUser has too many connections
ErrMessageTooLargeMessage exceeds MaxMessageSize
ErrMessageNotFoundMessage not found
ErrChannelNotFoundChannel does not exist
ErrChannelAlreadyExistsChannel name collision
ErrNotSubscribedNot subscribed to channel
ErrAlreadySubscribedAlready subscribed to channel
ErrPermissionDeniedInsufficient permissions
ErrInviteNotFoundInvite not found
ErrInviteExpiredRoom invite has expired
ErrPresenceNotFoundNo presence data for user
ErrInvalidConfigInvalid configuration

How is this guide?

On this page