Features
MQTT extension capabilities and client details
MQTT Interface
The extension exposes a comprehensive MQTT interface:
type MQTT interface {
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
IsConnected() bool
Reconnect() error
Publish(topic string, qos byte, retained bool, payload interface{}) error
PublishAsync(topic string, qos byte, retained bool, payload interface{}) error
Subscribe(topic string, qos byte, handler MessageHandler) error
SubscribeMultiple(filters map[string]byte, handler MessageHandler) error
Unsubscribe(topics ...string) error
AddRoute(topic string, handler MessageHandler)
SetDefaultHandler(handler MessageHandler)
SetOnConnectHandler(handler ConnectHandler)
SetConnectionLostHandler(handler ConnectionLostHandler)
SetReconnectingHandler(handler ReconnectingHandler)
GetClient() mqttclient.Client
GetStats() ClientStats
GetSubscriptions() []SubscriptionInfo
Ping(ctx context.Context) error
}Publish Messages
Send messages to any topic with configurable QoS level and retained flag:
// Synchronous publish (waits for broker acknowledgement based on QoS)
client.Publish("sensors/temperature", 1, false, []byte("23.5"))
// Publish with retained flag (new subscribers get the latest value)
client.Publish("status/gateway", 1, true, []byte(`{"status":"online"}`))
// Asynchronous publish (fire-and-forget)
client.PublishAsync("metrics/cpu", 0, false, []byte("42.1"))Subscribe to Topics
Register message handlers for topics with full MQTT wildcard support:
// Single topic
client.Subscribe("sensors/temperature/room-1", 1, handler)
// Single-level wildcard: matches sensors/temperature/room-1, room-2, etc.
client.Subscribe("sensors/temperature/+", 1, handler)
// Multi-level wildcard: matches all topics under sensors/
client.Subscribe("sensors/#", 1, handler)
// Multiple topics at once with per-topic QoS
client.SubscribeMultiple(map[string]byte{
"sensors/temperature/#": 1,
"sensors/humidity/#": 1,
"alerts/#": 2, // exactly-once for alerts
}, handler)QoS Levels
Full support for all three MQTT QoS levels:
| Level | Name | Guarantee | Use Case |
|---|---|---|---|
| 0 | At most once | Fire and forget, no acknowledgement | Metrics, telemetry where loss is acceptable |
| 1 | At least once | Guaranteed delivery, possible duplicates | Sensor data, event notifications |
| 2 | Exactly once | Guaranteed single delivery | Financial transactions, critical commands |
The default QoS is configurable via DefaultQoS (default: 0).
Auto-Reconnect
The client automatically reconnects on connection loss:
AutoReconnect-- enable/disable auto-reconnect (default: true)MaxReconnectDelay-- maximum backoff delay between attemptsMaxReconnectAttempts-- limit total reconnection attempts (0 = unlimited)ResumeSubs-- re-subscribe to all topics after successful reconnection
mqtt.NewExtension(
mqtt.WithAutoReconnect(true),
// MaxReconnectDelay and ResumeSubs configured via YAML or WithConfig
)Last Will and Testament
Configure a will message that the broker publishes when the client disconnects unexpectedly:
mqtt.NewExtension(
mqtt.WithWill("status/gateway-01", []byte(`{"status":"offline"}`), 1, true),
)Parameters: topic, payload, QoS level, and retained flag.
Connection Lifecycle Handlers
Register callbacks for connection events:
client.SetOnConnectHandler(func(c mqttclient.Client) {
// Called after successful connection or reconnection
})
client.SetConnectionLostHandler(func(c mqttclient.Client, err error) {
// Called when connection is lost
})
client.SetReconnectingHandler(func(c mqttclient.Client, opts *mqttclient.ClientOptions) {
// Called before each reconnection attempt
})Topic Routing
Add per-topic message routes and a default handler for unmatched topics:
// Route specific topics to specific handlers
client.AddRoute("commands/+", commandHandler)
client.AddRoute("config/+", configHandler)
// Catch-all for unmatched messages
client.SetDefaultHandler(func(c mqttclient.Client, msg mqttclient.Message) {
log.Printf("Unhandled: %s", msg.Topic())
})Subscription Tracking
Query active subscriptions:
subs := client.GetSubscriptions()
for _, sub := range subs {
fmt.Printf("Topic: %s, QoS: %d\n", sub.Topic, sub.QoS)
}Client Statistics
Track connection health and message throughput:
stats := client.GetStats()
fmt.Printf("Connected: %v, Sent: %d, Received: %d, Reconnects: %d\n",
stats.Connected, stats.MessagesSent, stats.MessagesReceived, stats.Reconnects)ClientStats includes: Connected, ConnectTime, LastMessageTime, MessagesReceived, MessagesSent, Subscriptions, Reconnects.
Clean Sessions
Toggle between persistent and clean sessions:
- Clean session (
CleanSession: true) -- broker discards previous session state. Fresh start on every connect. - Persistent session (
CleanSession: false) -- broker preserves subscriptions and queued QoS 1/2 messages across disconnects.
Message Persistence
Choose between in-memory or file-based message store for QoS 1/2 message tracking:
MessageStore: "memory"-- in-memory store (default)MessageStore: "file"-- file-based store atStoreDirectorypath
TLS Encryption
Connect over TLS with custom certificates:
mqtt.NewExtension(
mqtt.WithTLS("cert.pem", "key.pem", "ca.pem"),
)Set TLSSkipVerify: true for development environments.
Message Ordering
When OrderMatters: true, the client ensures messages are delivered in order. This may impact throughput as the client waits for each message acknowledgement before sending the next.
Sentinel Errors
| Error | Meaning |
|---|---|
ErrNotConnected | Client is not connected to the broker |
ErrAlreadyConnected | Client is already connected |
ErrConnectionFailed | Failed to connect to the broker |
ErrPublishFailed | Message publish failed |
ErrSubscribeFailed | Topic subscription failed |
ErrUnsubscribeFailed | Topic unsubscription failed |
ErrInvalidQoS | QoS level is not 0, 1, or 2 |
ErrInvalidTopic | Topic string is empty or invalid |
ErrTimeout | Operation timed out |
How is this guide?