MQTT

Features

MQTT extension capabilities and client details

MQTT Interface

The extension exposes a comprehensive MQTT interface:

type MQTT interface {
    Connect(ctx context.Context) error
    Disconnect(ctx context.Context) error
    IsConnected() bool
    Reconnect() error
    Publish(topic string, qos byte, retained bool, payload interface{}) error
    PublishAsync(topic string, qos byte, retained bool, payload interface{}) error
    Subscribe(topic string, qos byte, handler MessageHandler) error
    SubscribeMultiple(filters map[string]byte, handler MessageHandler) error
    Unsubscribe(topics ...string) error
    AddRoute(topic string, handler MessageHandler)
    SetDefaultHandler(handler MessageHandler)
    SetOnConnectHandler(handler ConnectHandler)
    SetConnectionLostHandler(handler ConnectionLostHandler)
    SetReconnectingHandler(handler ReconnectingHandler)
    GetClient() mqttclient.Client
    GetStats() ClientStats
    GetSubscriptions() []SubscriptionInfo
    Ping(ctx context.Context) error
}

Publish Messages

Send messages to any topic with configurable QoS level and retained flag:

// Synchronous publish (waits for broker acknowledgement based on QoS)
client.Publish("sensors/temperature", 1, false, []byte("23.5"))

// Publish with retained flag (new subscribers get the latest value)
client.Publish("status/gateway", 1, true, []byte(`{"status":"online"}`))

// Asynchronous publish (fire-and-forget)
client.PublishAsync("metrics/cpu", 0, false, []byte("42.1"))

Subscribe to Topics

Register message handlers for topics with full MQTT wildcard support:

// Single topic
client.Subscribe("sensors/temperature/room-1", 1, handler)

// Single-level wildcard: matches sensors/temperature/room-1, room-2, etc.
client.Subscribe("sensors/temperature/+", 1, handler)

// Multi-level wildcard: matches all topics under sensors/
client.Subscribe("sensors/#", 1, handler)

// Multiple topics at once with per-topic QoS
client.SubscribeMultiple(map[string]byte{
    "sensors/temperature/#": 1,
    "sensors/humidity/#":    1,
    "alerts/#":              2,  // exactly-once for alerts
}, handler)

QoS Levels

Full support for all three MQTT QoS levels:

LevelNameGuaranteeUse Case
0At most onceFire and forget, no acknowledgementMetrics, telemetry where loss is acceptable
1At least onceGuaranteed delivery, possible duplicatesSensor data, event notifications
2Exactly onceGuaranteed single deliveryFinancial transactions, critical commands

The default QoS is configurable via DefaultQoS (default: 0).

Auto-Reconnect

The client automatically reconnects on connection loss:

  • AutoReconnect -- enable/disable auto-reconnect (default: true)
  • MaxReconnectDelay -- maximum backoff delay between attempts
  • MaxReconnectAttempts -- limit total reconnection attempts (0 = unlimited)
  • ResumeSubs -- re-subscribe to all topics after successful reconnection
mqtt.NewExtension(
    mqtt.WithAutoReconnect(true),
    // MaxReconnectDelay and ResumeSubs configured via YAML or WithConfig
)

Last Will and Testament

Configure a will message that the broker publishes when the client disconnects unexpectedly:

mqtt.NewExtension(
    mqtt.WithWill("status/gateway-01", []byte(`{"status":"offline"}`), 1, true),
)

Parameters: topic, payload, QoS level, and retained flag.

Connection Lifecycle Handlers

Register callbacks for connection events:

client.SetOnConnectHandler(func(c mqttclient.Client) {
    // Called after successful connection or reconnection
})

client.SetConnectionLostHandler(func(c mqttclient.Client, err error) {
    // Called when connection is lost
})

client.SetReconnectingHandler(func(c mqttclient.Client, opts *mqttclient.ClientOptions) {
    // Called before each reconnection attempt
})

Topic Routing

Add per-topic message routes and a default handler for unmatched topics:

// Route specific topics to specific handlers
client.AddRoute("commands/+", commandHandler)
client.AddRoute("config/+", configHandler)

// Catch-all for unmatched messages
client.SetDefaultHandler(func(c mqttclient.Client, msg mqttclient.Message) {
    log.Printf("Unhandled: %s", msg.Topic())
})

Subscription Tracking

Query active subscriptions:

subs := client.GetSubscriptions()
for _, sub := range subs {
    fmt.Printf("Topic: %s, QoS: %d\n", sub.Topic, sub.QoS)
}

Client Statistics

Track connection health and message throughput:

stats := client.GetStats()
fmt.Printf("Connected: %v, Sent: %d, Received: %d, Reconnects: %d\n",
    stats.Connected, stats.MessagesSent, stats.MessagesReceived, stats.Reconnects)

ClientStats includes: Connected, ConnectTime, LastMessageTime, MessagesReceived, MessagesSent, Subscriptions, Reconnects.

Clean Sessions

Toggle between persistent and clean sessions:

  • Clean session (CleanSession: true) -- broker discards previous session state. Fresh start on every connect.
  • Persistent session (CleanSession: false) -- broker preserves subscriptions and queued QoS 1/2 messages across disconnects.

Message Persistence

Choose between in-memory or file-based message store for QoS 1/2 message tracking:

  • MessageStore: "memory" -- in-memory store (default)
  • MessageStore: "file" -- file-based store at StoreDirectory path

TLS Encryption

Connect over TLS with custom certificates:

mqtt.NewExtension(
    mqtt.WithTLS("cert.pem", "key.pem", "ca.pem"),
)

Set TLSSkipVerify: true for development environments.

Message Ordering

When OrderMatters: true, the client ensures messages are delivered in order. This may impact throughput as the client waits for each message acknowledgement before sending the next.

Sentinel Errors

ErrorMeaning
ErrNotConnectedClient is not connected to the broker
ErrAlreadyConnectedClient is already connected
ErrConnectionFailedFailed to connect to the broker
ErrPublishFailedMessage publish failed
ErrSubscribeFailedTopic subscription failed
ErrUnsubscribeFailedTopic unsubscription failed
ErrInvalidQoSQoS level is not 0, 1, or 2
ErrInvalidTopicTopic string is empty or invalid
ErrTimeoutOperation timed out

How is this guide?

On this page