Features
Streaming extension capabilities
Manager Interface
The core Manager interface handles connections, rooms, channels, broadcasting, presence, and typing:
type Manager interface {
// Connections
Register(ctx context.Context, conn Connection) error
Unregister(ctx context.Context, conn Connection) error
GetConnection(connID string) (Connection, error)
GetUserConnections(userID string) []Connection
ConnectionCount() int
KickConnection(ctx context.Context, connID string, reason string) error
// Rooms
CreateRoom(ctx context.Context, opts RoomOptions) (Room, error)
GetRoom(ctx context.Context, roomID string) (Room, error)
DeleteRoom(ctx context.Context, roomID string) error
JoinRoom(ctx context.Context, roomID, userID string, opts MemberOptions) error
LeaveRoom(ctx context.Context, roomID, userID string) error
GetRoomMembers(ctx context.Context, roomID string) ([]Member, error)
ListRooms(ctx context.Context) ([]Room, error)
// Channels
CreateChannel(ctx context.Context, name string) error
Subscribe(ctx context.Context, channelID, userID string) error
Unsubscribe(ctx context.Context, channelID, userID string) error
// Broadcasting
Broadcast(ctx context.Context, data []byte) error
BroadcastToRoom(ctx context.Context, roomID string, data []byte) error
BroadcastToChannel(ctx context.Context, channelID string, data []byte) error
SendToUser(ctx context.Context, userID string, data []byte) error
SendToConnection(ctx context.Context, connID string, data []byte) error
// Presence and Typing
SetPresence(ctx context.Context, userID string, opts PresenceOptions) error
GetPresence(ctx context.Context, userID string) (*PresenceEvent, error)
GetOnlineUsers(ctx context.Context) ([]string, error)
StartTyping(ctx context.Context, roomID, userID string) error
StopTyping(ctx context.Context, roomID, userID string) error
GetTypingUsers(ctx context.Context, roomID string) ([]string, error)
// History
SaveMessage(ctx context.Context, msg *Message) error
GetHistory(ctx context.Context, query HistoryQuery) ([]*Message, error)
// Lifecycle
Start(ctx context.Context) error
Stop(ctx context.Context) error
Health(ctx context.Context) error
}WebSocket and SSE Transport
Dual transport support:
- WebSocket -- bidirectional communication. Clients send and receive JSON messages with
type,room_id,channel_id,user_id, anddatafields. - SSE (Server-Sent Events) -- server-to-client push for environments where WebSocket is not available.
Message types: MessageTypeMessage, MessageTypeJoin, MessageTypeLeave, MessageTypeTyping, MessageTypePresence.
Room Management
Create and manage chat rooms with rich options:
room, _ := mgr.CreateRoom(ctx, streaming.RoomOptions{
Name: "project-alpha",
MaxMembers: 100,
})
// Join with role
mgr.JoinRoom(ctx, room.ID(), userID, streaming.MemberOptions{Role: "moderator"})
// Room lifecycle
mgr.ArchiveRoom(ctx, roomID)
mgr.RestoreRoom(ctx, roomID)
mgr.TransferRoomOwnership(ctx, roomID, newOwnerID)
mgr.BulkJoinRoom(ctx, roomID, []string{user1, user2, user3})Room roles: owner, admin, moderator, member. Each role has different permissions for moderation, invites, and room management.
Rooms also support: invites with expiration, bans, moderation events, and search by name or metadata.
Channel Pub/Sub
Subscribe to named channels for topic-based broadcasting:
mgr.CreateChannel(ctx, "notifications")
mgr.Subscribe(ctx, "notifications", userID)
mgr.BroadcastToChannel(ctx, "notifications", []byte(`{"type":"alert","message":"Deploy complete"}`))
mgr.Unsubscribe(ctx, "notifications", userID)Presence Tracking
Track user online/offline status across connections with configurable timeout and cleanup:
mgr.SetPresence(ctx, userID, streaming.PresenceOptions{
Status: "online",
Device: &streaming.DeviceInfo{Type: "desktop", Name: "Chrome"},
Activity: &streaming.ActivityInfo{Type: "browsing", Detail: "Dashboard"},
Availability: "available",
})
presence, _ := mgr.GetPresence(ctx, userID)
online, _ := mgr.GetOnlineUsers(ctx)
mgr.TrackActivity(ctx, userID) // refresh last-seenPresence timeout defaults to 5 minutes. Cleanup runs periodically to remove stale entries.
Typing Indicators
Real-time typing status per user per room with automatic expiration:
mgr.StartTyping(ctx, roomID, userID) // auto-expires after 3 seconds
mgr.StopTyping(ctx, roomID, userID) // explicit stop
typing, _ := mgr.GetTypingUsers(ctx, roomID)Configurable: TypingTimeout (default 3s), MaxTypingUsersPerRoom.
Message History
Persist messages with configurable retention and query support:
// Save
mgr.SaveMessage(ctx, &streaming.Message{
RoomID: roomID,
UserID: userID,
Data: []byte(`{"text":"Hello!"}`),
})
// Query
messages, _ := mgr.GetHistory(ctx, streaming.HistoryQuery{
RoomID: roomID,
Limit: 50,
Before: time.Now(),
})Messages support reactions, edits, and metadata attachments. Retention defaults to 30 days with configurable cleanup.
Connection Management
- Per-user connection limits (default: 5)
- Session ID and metadata tracking
- Automatic activity tracking
- Idle connection cleanup
conns := mgr.GetUserConnections(userID)
mgr.KickConnection(ctx, connID, "session expired")
idle := mgr.GetIdleConnections(ctx, 30*time.Minute)
mgr.CleanupIdleConnections(ctx, 30*time.Minute)Rate Limiting
- Per-user message rate limits (default: 100 messages/second)
- Max message size (default: 64 KB)
- Returns
ErrMessageTooLargewhen exceeded
Distributed Backends
Scale across nodes with Redis or NATS backends:
streaming.NewExtension(
streaming.WithRedisBackend("redis://localhost:6379"),
streaming.WithFeatures(true, true, true, true, true),
)| Backend | Transport | Best For |
|---|---|---|
local | In-process maps | Single instance, development |
redis | Redis Pub/Sub | Production, multi-node |
nats | NATS messaging | High-throughput, multi-node |
Distributed mode provides: cross-node message routing, presence synchronization, distributed locking, node heartbeats, and failover.
Sentinel Errors
| Error | Meaning |
|---|---|
ErrRoomNotFound | Room does not exist |
ErrRoomAlreadyExists | Room name collision |
ErrRoomFull | Room has reached max members |
ErrNotRoomMember | User is not a member of the room |
ErrAlreadyRoomMember | User is already in the room |
ErrRoomLimitReached | User has joined too many rooms |
ErrConnectionNotFound | Connection ID not found |
ErrConnectionClosed | Connection is closed |
ErrConnectionLimitReached | User has too many connections |
ErrMessageTooLarge | Message exceeds MaxMessageSize |
ErrMessageNotFound | Message not found |
ErrChannelNotFound | Channel does not exist |
ErrChannelAlreadyExists | Channel name collision |
ErrNotSubscribed | Not subscribed to channel |
ErrAlreadySubscribed | Already subscribed to channel |
ErrPermissionDenied | Insufficient permissions |
ErrInviteNotFound | Invite not found |
ErrInviteExpired | Room invite has expired |
ErrPresenceNotFound | No presence data for user |
ErrInvalidConfig | Invalid configuration |
How is this guide?