ExtensionsBrokers
MQTT Extension
IoT messaging and pub/sub communication with MQTT protocol support
MQTT Extension
The MQTT extension provides comprehensive support for IoT messaging and publish/subscribe communication patterns using the MQTT protocol. It enables real-time device communication, sensor data collection, and event-driven architectures.
Features
Core Messaging
- Publish/Subscribe: Full pub/sub messaging patterns
- QoS Levels: Support for QoS 0, 1, and 2
- Topic Wildcards: Single-level (+) and multi-level (#) wildcards
- Retained Messages: Message persistence for late subscribers
- Message Routing: Flexible message handling and routing
Device Communication
- IoT Integration: Seamless device connectivity
- Sensor Data: Real-time sensor data collection
- Device Control: Remote device command and control
- Fleet Management: Multi-device coordination
- Edge Computing: Local processing capabilities
Reliability & Performance
- Auto-Reconnect: Automatic connection recovery
- Connection Pooling: Efficient connection management
- Message Persistence: File and memory-based storage
- Clean Sessions: Configurable session handling
- Keep-Alive: Connection health monitoring
Security
- TLS/SSL: Encrypted communication
- mTLS: Mutual authentication
- Authentication: Username/password authentication
- Certificate Management: X.509 certificate support
- Secure Connections: End-to-end encryption
Advanced Features
- Last Will and Testament: Automatic disconnect notifications
- Message Ordering: Ordered message delivery
- Batch Operations: Bulk message handling
- Custom Handlers: Extensible event handling
- Health Monitoring: Connection and client health checks
Observability
- Metrics: Built-in Prometheus metrics
- Tracing: Distributed tracing support
- Logging: Comprehensive logging
- Statistics: Client and connection statistics
- Health Checks: Endpoint health monitoring
Installation
go get github.com/forge-build/forge/extensions/mqttQuick Start
Basic MQTT Client
package main
import (
"context"
"log"
"time"
"github.com/forge-build/forge"
"github.com/forge-build/forge/extensions/mqtt"
)
func main() {
// Create Forge app
app := forge.New()
// Add MQTT extension
app.Use(mqtt.New(
mqtt.WithBroker("tcp://localhost:1883"),
mqtt.WithClientID("my-client"),
mqtt.WithQoS(1),
))
// Get MQTT client
var mqttClient mqtt.MQTT
app.Resolve(&mqttClient)
// Connect to broker
if err := mqttClient.Connect(); err != nil {
log.Fatal(err)
}
// Publish message
err := mqttClient.Publish("sensors/temperature", map[string]interface{}{
"value": 23.5,
"unit": "celsius",
"time": time.Now(),
})
if err != nil {
log.Printf("Publish failed: %v", err)
}
// Subscribe to topic
err = mqttClient.Subscribe("sensors/+", func(topic string, payload []byte) {
log.Printf("Received on %s: %s", topic, string(payload))
})
if err != nil {
log.Printf("Subscribe failed: %v", err)
}
// Start the app
app.Run()
}IoT Sensor Data Collection
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/forge-build/forge"
"github.com/forge-build/forge/extensions/mqtt"
)
type SensorData struct {
DeviceID string `json:"device_id"`
SensorType string `json:"sensor_type"`
Value float64 `json:"value"`
Unit string `json:"unit"`
Timestamp time.Time `json:"timestamp"`
Location string `json:"location"`
}
func main() {
app := forge.New()
// Configure MQTT for IoT
app.Use(mqtt.New(
mqtt.WithBroker("ssl://iot.example.com:8883"),
mqtt.WithClientID("sensor-collector"),
mqtt.WithCredentials("iot-user", "secure-password"),
mqtt.WithTLS("client.crt", "client.key", "ca.crt", false),
mqtt.WithQoS(1),
mqtt.WithAutoReconnect(true),
))
var mqttClient mqtt.MQTT
app.Resolve(&mqttClient)
// Connect with error handling
if err := mqttClient.Connect(); err != nil {
log.Fatal("Failed to connect:", err)
}
// Subscribe to all sensor data
err := mqttClient.Subscribe("sensors/+/+", func(topic string, payload []byte) {
var data SensorData
if err := json.Unmarshal(payload, &data); err != nil {
log.Printf("Failed to parse sensor data: %v", err)
return
}
log.Printf("Sensor data from %s: %s = %.2f %s",
data.DeviceID, data.SensorType, data.Value, data.Unit)
// Process sensor data
processSensorData(data)
})
if err != nil {
log.Fatal("Failed to subscribe:", err)
}
app.Run()
}
func processSensorData(data SensorData) {
// Store in database, trigger alerts, etc.
}Device Control System
package main
import (
"encoding/json"
"log"
"github.com/forge-build/forge"
"github.com/forge-build/forge/extensions/mqtt"
)
type DeviceCommand struct {
DeviceID string `json:"device_id"`
Command string `json:"command"`
Params map[string]interface{} `json:"params"`
}
type DeviceStatus struct {
DeviceID string `json:"device_id"`
Status string `json:"status"`
Message string `json:"message"`
}
func main() {
app := forge.New()
app.Use(mqtt.New(
mqtt.WithBroker("tcp://localhost:1883"),
mqtt.WithClientID("device-controller"),
mqtt.WithQoS(2), // Exactly once delivery for commands
))
var mqttClient mqtt.MQTT
app.Resolve(&mqttClient)
if err := mqttClient.Connect(); err != nil {
log.Fatal(err)
}
// Subscribe to device status updates
mqttClient.Subscribe("devices/+/status", func(topic string, payload []byte) {
var status DeviceStatus
if err := json.Unmarshal(payload, &status); err != nil {
log.Printf("Failed to parse status: %v", err)
return
}
log.Printf("Device %s status: %s - %s",
status.DeviceID, status.Status, status.Message)
})
// Send command to device
sendDeviceCommand := func(deviceID, command string, params map[string]interface{}) error {
cmd := DeviceCommand{
DeviceID: deviceID,
Command: command,
Params: params,
}
topic := fmt.Sprintf("devices/%s/commands", deviceID)
return mqttClient.Publish(topic, cmd)
}
// Example: Turn on LED
err := sendDeviceCommand("device-001", "led_control", map[string]interface{}{
"state": "on",
"color": "blue",
"brightness": 80,
})
if err != nil {
log.Printf("Failed to send command: %v", err)
}
app.Run()
}Configuration
Programmatic Configuration
app.Use(mqtt.New(
// Connection settings
mqtt.WithBroker("tcp://localhost:1883"),
mqtt.WithClientID("my-client"),
mqtt.WithCredentials("username", "password"),
// TLS/SSL settings
mqtt.WithTLS("client.crt", "client.key", "ca.crt", false),
// QoS and reliability
mqtt.WithQoS(1),
mqtt.WithCleanSession(true),
mqtt.WithAutoReconnect(true),
mqtt.WithKeepAlive(30 * time.Second),
// Last Will and Testament
mqtt.WithWill("devices/my-client/status", "offline", 1, true),
// Observability
mqtt.WithMetrics(true),
mqtt.WithTracing(true),
))YAML Configuration
extensions:
mqtt:
# Connection settings
broker: "tcp://localhost:1883"
client_id: "my-client"
username: "mqtt-user"
password: "mqtt-password"
clean_session: true
connect_timeout: "30s"
keep_alive: "60s"
ping_timeout: "10s"
max_reconnect_delay: "10m"
# TLS/SSL settings
enable_tls: true
tls_cert_file: "/path/to/client.crt"
tls_key_file: "/path/to/client.key"
tls_ca_file: "/path/to/ca.crt"
tls_skip_verify: false
# QoS and reliability
default_qos: 1
auto_reconnect: true
resume_subs: true
max_reconnect_attempts: 10
write_timeout: "30s"
# Message handling
message_channel_depth: 100
order_matters: true
message_store: "file"
store_directory: "/var/lib/mqtt"
# Last Will and Testament
will_enabled: true
will_topic: "devices/my-client/status"
will_payload: "offline"
will_qos: 1
will_retained: true
# Observability
enable_metrics: true
enable_tracing: true
enable_logging: trueEnvironment Variables
# Connection
MQTT_BROKER=tcp://localhost:1883
MQTT_CLIENT_ID=my-client
MQTT_USERNAME=mqtt-user
MQTT_PASSWORD=mqtt-password
# TLS
MQTT_ENABLE_TLS=true
MQTT_TLS_CERT_FILE=/path/to/client.crt
MQTT_TLS_KEY_FILE=/path/to/client.key
MQTT_TLS_CA_FILE=/path/to/ca.crt
# QoS and reliability
MQTT_DEFAULT_QOS=1
MQTT_AUTO_RECONNECT=true
MQTT_KEEP_ALIVE=60s
# Last Will and Testament
MQTT_WILL_ENABLED=true
MQTT_WILL_TOPIC=devices/my-client/status
MQTT_WILL_PAYLOAD=offlineUsage Patterns
Publishing Messages
Synchronous Publishing
// Simple string message
err := mqttClient.Publish("topic/test", "Hello, MQTT!")
// JSON message
data := map[string]interface{}{
"temperature": 23.5,
"humidity": 65.2,
"timestamp": time.Now(),
}
err := mqttClient.Publish("sensors/room1", data)
// Binary data
err := mqttClient.PublishBytes("data/binary", []byte{0x01, 0x02, 0x03})Asynchronous Publishing
// Publish with callback
token := mqttClient.PublishAsync("topic/async", "message", func(token mqtt.Token) {
if token.Error() != nil {
log.Printf("Publish failed: %v", token.Error())
} else {
log.Println("Message published successfully")
}
})
// Wait for completion
token.Wait()QoS-Specific Publishing
// QoS 0 - At most once
err := mqttClient.PublishWithQoS("topic/qos0", "message", 0)
// QoS 1 - At least once
err := mqttClient.PublishWithQoS("topic/qos1", "message", 1)
// QoS 2 - Exactly once
err := mqttClient.PublishWithQoS("topic/qos2", "message", 2)Retained Messages
// Publish retained message
err := mqttClient.PublishRetained("status/device1", "online")
// Clear retained message
err := mqttClient.PublishRetained("status/device1", "")Subscribing to Topics
Basic Subscription
err := mqttClient.Subscribe("sensors/temperature", func(topic string, payload []byte) {
log.Printf("Temperature: %s", string(payload))
})Multiple Topic Subscription
topics := map[string]mqtt.MessageHandler{
"sensors/temperature": func(topic string, payload []byte) {
// Handle temperature data
},
"sensors/humidity": func(topic string, payload []byte) {
// Handle humidity data
},
"alerts/+": func(topic string, payload []byte) {
// Handle all alerts
},
}
err := mqttClient.SubscribeMultiple(topics)Wildcard Subscriptions
// Single-level wildcard
err := mqttClient.Subscribe("sensors/+/temperature", func(topic string, payload []byte) {
// Matches: sensors/room1/temperature, sensors/room2/temperature
})
// Multi-level wildcard
err := mqttClient.Subscribe("devices/#", func(topic string, payload []byte) {
// Matches: devices/sensor1, devices/sensor1/status, devices/sensor1/data/temp
})QoS-Specific Subscription
err := mqttClient.SubscribeWithQoS("critical/alerts", 2, func(topic string, payload []byte) {
// Handle critical alerts with exactly-once delivery
})Message Handling
Structured Message Handling
type SensorMessage struct {
DeviceID string `json:"device_id"`
Value float64 `json:"value"`
Timestamp time.Time `json:"timestamp"`
}
err := mqttClient.Subscribe("sensors/+", func(topic string, payload []byte) {
var msg SensorMessage
if err := json.Unmarshal(payload, &msg); err != nil {
log.Printf("Failed to parse message: %v", err)
return
}
// Process structured message
processSensorData(msg)
})Message Routing
// Set up message router
mqttClient.AddRoute("sensors/+/temperature", handleTemperature)
mqttClient.AddRoute("sensors/+/humidity", handleHumidity)
mqttClient.AddRoute("alerts/#", handleAlerts)
// Default handler for unmatched topics
mqttClient.SetDefaultHandler(func(topic string, payload []byte) {
log.Printf("Unhandled message on %s: %s", topic, string(payload))
})Connection Management
Connection Handlers
// Connection established
mqttClient.SetConnectHandler(func() {
log.Println("Connected to MQTT broker")
// Re-subscribe to topics
mqttClient.Subscribe("sensors/+", handleSensorData)
})
// Connection lost
mqttClient.SetConnectionLostHandler(func(err error) {
log.Printf("Connection lost: %v", err)
// Implement custom reconnection logic if needed
})
// Reconnecting
mqttClient.SetReconnectingHandler(func(opts *mqtt.ClientOptions, retryCount int) {
log.Printf("Reconnecting... attempt %d", retryCount)
})Manual Connection Control
// Connect
if err := mqttClient.Connect(); err != nil {
log.Fatal("Failed to connect:", err)
}
// Check connection status
if mqttClient.IsConnected() {
log.Println("Client is connected")
}
// Disconnect gracefully
mqttClient.Disconnect(250) // 250ms timeoutAdvanced Features
Last Will and Testament
// Configure LWT during client creation
app.Use(mqtt.New(
mqtt.WithBroker("tcp://localhost:1883"),
mqtt.WithClientID("device-001"),
mqtt.WithWill("devices/device-001/status", "offline", 1, true),
))
// Or set LWT programmatically
mqttClient.SetWill("devices/device-001/status", "offline", 1, true)Message Persistence
// File-based message store
app.Use(mqtt.New(
mqtt.WithBroker("tcp://localhost:1883"),
mqtt.WithConfig(mqtt.Config{
MessageStore: "file",
StoreDirectory: "/var/lib/mqtt/store",
}),
))
// Memory-based store (default)
app.Use(mqtt.New(
mqtt.WithBroker("tcp://localhost:1883"),
mqtt.WithConfig(mqtt.Config{
MessageStore: "memory",
}),
))Batch Operations
// Batch publish
messages := []mqtt.Message{
{Topic: "sensors/temp1", Payload: "23.5"},
{Topic: "sensors/temp2", Payload: "24.1"},
{Topic: "sensors/temp3", Payload: "22.8"},
}
for _, msg := range messages {
mqttClient.Publish(msg.Topic, msg.Payload)
}
// Batch subscribe
topics := map[string]mqtt.MessageHandler{
"sensors/temperature": handleTemperature,
"sensors/humidity": handleHumidity,
"sensors/pressure": handlePressure,
}
err := mqttClient.SubscribeMultiple(topics)Security
TLS/SSL Configuration
Basic TLS
app.Use(mqtt.New(
mqtt.WithBroker("ssl://broker.example.com:8883"),
mqtt.WithTLS("", "", "", true), // Skip certificate verification
))Certificate-Based TLS
app.Use(mqtt.New(
mqtt.WithBroker("ssl://broker.example.com:8883"),
mqtt.WithTLS("client.crt", "client.key", "ca.crt", false),
))Mutual TLS (mTLS)
extensions:
mqtt:
broker: "ssl://secure-broker.example.com:8883"
enable_tls: true
tls_cert_file: "/etc/ssl/certs/client.crt"
tls_key_file: "/etc/ssl/private/client.key"
tls_ca_file: "/etc/ssl/certs/ca.crt"
tls_skip_verify: falseAuthentication
Username/Password
app.Use(mqtt.New(
mqtt.WithBroker("tcp://broker.example.com:1883"),
mqtt.WithCredentials("mqtt-user", "secure-password"),
))Certificate-Based Authentication
app.Use(mqtt.New(
mqtt.WithBroker("ssl://broker.example.com:8883"),
mqtt.WithTLS("client.crt", "client.key", "ca.crt", false),
// Client certificate serves as authentication
))Security Best Practices
// Secure configuration example
app.Use(mqtt.New(
// Use encrypted connection
mqtt.WithBroker("ssl://broker.example.com:8883"),
// Strong authentication
mqtt.WithCredentials("secure-user", "complex-password"),
// Certificate validation
mqtt.WithTLS("client.crt", "client.key", "ca.crt", false),
// Unique client ID
mqtt.WithClientID(fmt.Sprintf("client-%s", uuid.New().String())),
// Clean sessions for security
mqtt.WithCleanSession(true),
// Appropriate QoS
mqtt.WithQoS(1),
))Monitoring and Observability
Built-in Metrics
The MQTT extension provides comprehensive metrics:
// Enable metrics
app.Use(mqtt.New(
mqtt.WithMetrics(true),
))
// Metrics available:
// - mqtt_messages_published_total
// - mqtt_messages_received_total
// - mqtt_connection_status
// - mqtt_subscription_count
// - mqtt_publish_duration_seconds
// - mqtt_connection_attempts_total
// - mqtt_reconnection_countClient Statistics
// Get client statistics
stats := mqttClient.GetStats()
fmt.Printf("Messages sent: %d\n", stats.MessagesSent)
fmt.Printf("Messages received: %d\n", stats.MessagesReceived)
fmt.Printf("Connection uptime: %v\n", stats.ConnectedSince)
fmt.Printf("Reconnection count: %d\n", stats.ReconnectCount)Health Checks
// Check client health
health := mqttClient.Health()
if health.Status == "healthy" {
log.Println("MQTT client is healthy")
} else {
log.Printf("MQTT client unhealthy: %s", health.Message)
}
// Get subscription info
subscriptions := mqttClient.GetSubscriptions()
for topic, qos := range subscriptions {
log.Printf("Subscribed to %s with QoS %d", topic, qos)
}Custom Metrics
import "github.com/prometheus/client_golang/prometheus"
var (
sensorDataReceived = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "sensor_data_received_total",
Help: "Total number of sensor data messages received",
},
[]string{"device_id", "sensor_type"},
)
)
func init() {
prometheus.MustRegister(sensorDataReceived)
}
// In message handler
mqttClient.Subscribe("sensors/+/+", func(topic string, payload []byte) {
// Parse topic to extract device_id and sensor_type
parts := strings.Split(topic, "/")
if len(parts) >= 3 {
deviceID := parts[1]
sensorType := parts[2]
sensorDataReceived.WithLabelValues(deviceID, sensorType).Inc()
}
// Process message...
})Performance Optimization
Connection Optimization
app.Use(mqtt.New(
mqtt.WithBroker("tcp://broker.example.com:1883"),
// Optimize connection settings
mqtt.WithKeepAlive(30 * time.Second),
mqtt.WithConfig(mqtt.Config{
ConnectTimeout: 10 * time.Second,
PingTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
MaxReconnectDelay: 5 * time.Minute,
MessageChannelDepth: 1000,
OrderMatters: false, // Better performance if order doesn't matter
}),
))Message Optimization
// Use appropriate QoS levels
// QoS 0 for non-critical data (best performance)
mqttClient.PublishWithQoS("sensors/temperature", data, 0)
// QoS 1 for important data
mqttClient.PublishWithQoS("alerts/critical", alert, 1)
// QoS 2 only for critical commands
mqttClient.PublishWithQoS("commands/shutdown", cmd, 2)
// Batch similar operations
var wg sync.WaitGroup
for _, sensor := range sensors {
wg.Add(1)
go func(s Sensor) {
defer wg.Done()
mqttClient.Publish(s.Topic, s.Data)
}(sensor)
}
wg.Wait()Memory Optimization
app.Use(mqtt.New(
mqtt.WithConfig(mqtt.Config{
// Use memory store for better performance
MessageStore: "memory",
// Limit channel depth
MessageChannelDepth: 100,
// Clean sessions to avoid state buildup
CleanSession: true,
// Disable order if not needed
OrderMatters: false,
}),
))Error Handling
Common Error Types
import "github.com/forge-build/forge/extensions/mqtt"
// Handle specific errors
err := mqttClient.Publish("topic", "message")
switch {
case errors.Is(err, mqtt.ErrNotConnected):
log.Println("Client not connected, attempting reconnection...")
mqttClient.Connect()
case errors.Is(err, mqtt.ErrPublishFailed):
log.Println("Publish failed, retrying...")
// Implement retry logic
case errors.Is(err, mqtt.ErrTimeout):
log.Println("Operation timed out")
case errors.Is(err, mqtt.ErrInvalidTopic):
log.Println("Invalid topic format")
default:
log.Printf("Unexpected error: %v", err)
}Retry Logic
func publishWithRetry(client mqtt.MQTT, topic string, payload interface{}, maxRetries int) error {
var err error
for i := 0; i < maxRetries; i++ {
err = client.Publish(topic, payload)
if err == nil {
return nil
}
if errors.Is(err, mqtt.ErrNotConnected) {
// Try to reconnect
if connErr := client.Connect(); connErr != nil {
log.Printf("Reconnection failed: %v", connErr)
}
}
// Exponential backoff
time.Sleep(time.Duration(1<<i) * time.Second)
}
return fmt.Errorf("failed after %d retries: %w", maxRetries, err)
}Circuit Breaker Pattern
type CircuitBreaker struct {
failures int
maxFailures int
timeout time.Duration
lastFailure time.Time
state string // "closed", "open", "half-open"
mutex sync.RWMutex
}
func (cb *CircuitBreaker) Call(fn func() error) error {
cb.mutex.Lock()
defer cb.mutex.Unlock()
if cb.state == "open" {
if time.Since(cb.lastFailure) > cb.timeout {
cb.state = "half-open"
} else {
return fmt.Errorf("circuit breaker is open")
}
}
err := fn()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.maxFailures {
cb.state = "open"
}
return err
}
cb.failures = 0
cb.state = "closed"
return nil
}
// Usage
cb := &CircuitBreaker{
maxFailures: 5,
timeout: 30 * time.Second,
state: "closed",
}
err := cb.Call(func() error {
return mqttClient.Publish("topic", "message")
})Best Practices
Message Design
// Use structured messages
type DeviceMessage struct {
DeviceID string `json:"device_id"`
MessageType string `json:"message_type"`
Timestamp time.Time `json:"timestamp"`
Data map[string]interface{} `json:"data"`
Metadata map[string]string `json:"metadata,omitempty"`
}
// Keep messages small and focused
func publishSensorReading(client mqtt.MQTT, deviceID string, sensorType string, value float64) error {
message := DeviceMessage{
DeviceID: deviceID,
MessageType: "sensor_reading",
Timestamp: time.Now(),
Data: map[string]interface{}{
"sensor_type": sensorType,
"value": value,
},
}
topic := fmt.Sprintf("devices/%s/sensors/%s", deviceID, sensorType)
return client.Publish(topic, message)
}Topic Design
// Good topic hierarchy
// devices/{device_id}/sensors/{sensor_type}
// devices/{device_id}/commands/{command_type}
// devices/{device_id}/status
// alerts/{severity}/{device_id}
// system/{component}/health
// Use consistent naming
const (
TopicDeviceSensors = "devices/%s/sensors/%s"
TopicDeviceCommands = "devices/%s/commands/%s"
TopicDeviceStatus = "devices/%s/status"
TopicAlerts = "alerts/%s/%s"
TopicSystemHealth = "system/%s/health"
)
func getDeviceSensorTopic(deviceID, sensorType string) string {
return fmt.Sprintf(TopicDeviceSensors, deviceID, sensorType)
}Connection Management
// Implement proper connection lifecycle
type MQTTManager struct {
client mqtt.MQTT
ctx context.Context
cancel context.CancelFunc
}
func NewMQTTManager(client mqtt.MQTT) *MQTTManager {
ctx, cancel := context.WithCancel(context.Background())
manager := &MQTTManager{
client: client,
ctx: ctx,
cancel: cancel,
}
// Set up connection handlers
client.SetConnectHandler(manager.onConnect)
client.SetConnectionLostHandler(manager.onConnectionLost)
return manager
}
func (m *MQTTManager) onConnect() {
log.Println("MQTT connected")
// Re-establish subscriptions
m.setupSubscriptions()
}
func (m *MQTTManager) onConnectionLost(err error) {
log.Printf("MQTT connection lost: %v", err)
// Implement custom recovery logic
go m.handleReconnection()
}
func (m *MQTTManager) setupSubscriptions() {
subscriptions := map[string]mqtt.MessageHandler{
"devices/+/sensors/+": m.handleSensorData,
"devices/+/status": m.handleDeviceStatus,
"alerts/#": m.handleAlerts,
}
for topic, handler := range subscriptions {
if err := m.client.Subscribe(topic, handler); err != nil {
log.Printf("Failed to subscribe to %s: %v", topic, err)
}
}
}
func (m *MQTTManager) Shutdown() {
m.cancel()
m.client.Disconnect(1000)
}Security Best Practices
// Secure client configuration
func createSecureMQTTClient() mqtt.MQTT {
return mqtt.New(
// Use TLS
mqtt.WithBroker("ssl://broker.example.com:8883"),
// Strong authentication
mqtt.WithCredentials(
os.Getenv("MQTT_USERNAME"),
os.Getenv("MQTT_PASSWORD"),
),
// Certificate validation
mqtt.WithTLS(
os.Getenv("MQTT_CERT_FILE"),
os.Getenv("MQTT_KEY_FILE"),
os.Getenv("MQTT_CA_FILE"),
false, // Don't skip verification
),
// Unique client ID
mqtt.WithClientID(fmt.Sprintf("client-%s-%d",
os.Getenv("HOSTNAME"),
time.Now().Unix(),
)),
// Clean sessions
mqtt.WithCleanSession(true),
)
}
// Topic access control
func validateTopicAccess(clientID, topic string) bool {
// Implement topic-based access control
allowedTopics := getClientTopics(clientID)
for _, allowed := range allowedTopics {
if matched, _ := path.Match(allowed, topic); matched {
return true
}
}
return false
}Troubleshooting
Common Issues
Connection Problems
// Debug connection issues
if err := mqttClient.Connect(); err != nil {
log.Printf("Connection failed: %v", err)
// Check common issues:
// 1. Broker address and port
// 2. Network connectivity
// 3. Authentication credentials
// 4. TLS configuration
// 5. Firewall settings
}
// Test connectivity
func testMQTTConnectivity(broker string) error {
conn, err := net.DialTimeout("tcp", broker, 5*time.Second)
if err != nil {
return fmt.Errorf("cannot connect to broker: %w", err)
}
conn.Close()
return nil
}Subscription Issues
// Debug subscription problems
err := mqttClient.Subscribe("sensors/+", func(topic string, payload []byte) {
log.Printf("Received: %s", topic)
})
if err != nil {
switch {
case errors.Is(err, mqtt.ErrNotConnected):
log.Println("Not connected - ensure client is connected first")
case errors.Is(err, mqtt.ErrSubscribeFailed):
log.Println("Subscription failed - check topic format and permissions")
case errors.Is(err, mqtt.ErrInvalidTopic):
log.Println("Invalid topic - check topic syntax")
}
}
// Verify subscriptions
subscriptions := mqttClient.GetSubscriptions()
for topic, qos := range subscriptions {
log.Printf("Active subscription: %s (QoS %d)", topic, qos)
}Message Delivery Issues
// Debug message delivery
token := mqttClient.PublishAsync("test/topic", "test message", func(token mqtt.Token) {
if token.Error() != nil {
log.Printf("Publish failed: %v", token.Error())
// Common causes:
// 1. Client not connected
// 2. Invalid topic
// 3. Message too large
// 4. QoS issues
// 5. Broker overload
} else {
log.Println("Message delivered successfully")
}
})
// Wait with timeout
if !token.WaitTimeout(5 * time.Second) {
log.Println("Publish timeout")
}Performance Issues
// Monitor performance
stats := mqttClient.GetStats()
log.Printf("Messages/sec: %.2f",
float64(stats.MessagesSent) / time.Since(stats.ConnectedSince).Seconds())
// Check for bottlenecks
if stats.MessagesSent > stats.MessagesAcknowledged {
log.Printf("Pending acknowledgments: %d",
stats.MessagesSent - stats.MessagesAcknowledged)
}
// Monitor memory usage
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Printf("Memory usage: %d KB", m.Alloc / 1024)Debugging Tools
// Enable debug logging
app.Use(mqtt.New(
mqtt.WithConfig(mqtt.Config{
EnableLogging: true,
}),
))
// Custom debug handler
mqttClient.SetDefaultHandler(func(topic string, payload []byte) {
log.Printf("DEBUG: Unhandled message on %s: %s", topic, string(payload))
})
// Connection state monitoring
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
if mqttClient.IsConnected() {
log.Println("MQTT: Connected")
} else {
log.Println("MQTT: Disconnected")
}
}
}()API Reference
Core Interface
type MQTT interface {
// Connection management
Connect() error
Disconnect(quiesce uint)
IsConnected() bool
// Publishing
Publish(topic string, payload interface{}) error
PublishBytes(topic string, payload []byte) error
PublishWithQoS(topic string, payload interface{}, qos byte) error
PublishRetained(topic string, payload interface{}) error
PublishAsync(topic string, payload interface{}, callback func(Token)) Token
// Subscribing
Subscribe(topic string, handler MessageHandler) error
SubscribeWithQoS(topic string, qos byte, handler MessageHandler) error
SubscribeMultiple(topics map[string]MessageHandler) error
Unsubscribe(topics ...string) error
// Message handling
AddRoute(topic string, handler MessageHandler)
SetDefaultHandler(handler MessageHandler)
// Connection handlers
SetConnectHandler(handler ConnectHandler)
SetConnectionLostHandler(handler ConnectionLostHandler)
SetReconnectingHandler(handler ReconnectingHandler)
// Client information
GetClient() Client
GetStats() ClientStats
GetSubscriptions() map[string]byte
// Health and monitoring
Health() HealthStatus
}Configuration Options
type Config struct {
// Connection settings
Broker string `json:"broker"`
ClientID string `json:"client_id"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
CleanSession bool `json:"clean_session"`
ConnectTimeout time.Duration `json:"connect_timeout"`
KeepAlive time.Duration `json:"keep_alive"`
PingTimeout time.Duration `json:"ping_timeout"`
MaxReconnectDelay time.Duration `json:"max_reconnect_delay"`
// TLS/SSL settings
EnableTLS bool `json:"enable_tls"`
TLSCertFile string `json:"tls_cert_file,omitempty"`
TLSKeyFile string `json:"tls_key_file,omitempty"`
TLSCAFile string `json:"tls_ca_file,omitempty"`
TLSSkipVerify bool `json:"tls_skip_verify"`
// QoS and reliability
DefaultQoS byte `json:"default_qos"`
AutoReconnect bool `json:"auto_reconnect"`
ResumeSubs bool `json:"resume_subs"`
MaxReconnectAttempts int `json:"max_reconnect_attempts"`
WriteTimeout time.Duration `json:"write_timeout"`
// Message handling
MessageChannelDepth uint `json:"message_channel_depth"`
OrderMatters bool `json:"order_matters"`
MessageStore string `json:"message_store"`
StoreDirectory string `json:"store_directory,omitempty"`
// Last Will and Testament
WillEnabled bool `json:"will_enabled"`
WillTopic string `json:"will_topic,omitempty"`
WillPayload string `json:"will_payload,omitempty"`
WillQoS byte `json:"will_qos"`
WillRetained bool `json:"will_retained"`
// Observability
EnableMetrics bool `json:"enable_metrics"`
EnableTracing bool `json:"enable_tracing"`
EnableLogging bool `json:"enable_logging"`
}Functional Options
// Connection options
func WithBroker(broker string) ConfigOption
func WithClientID(clientID string) ConfigOption
func WithCredentials(username, password string) ConfigOption
func WithCleanSession(clean bool) ConfigOption
func WithKeepAlive(duration time.Duration) ConfigOption
func WithAutoReconnect(enable bool) ConfigOption
// Security options
func WithTLS(certFile, keyFile, caFile string, skipVerify bool) ConfigOption
// QoS options
func WithQoS(qos byte) ConfigOption
// Last Will and Testament
func WithWill(topic, payload string, qos byte, retained bool) ConfigOption
// Observability options
func WithMetrics(enable bool) ConfigOption
func WithTracing(enable bool) ConfigOption
// Configuration
func WithConfig(config Config) ConfigOption
func WithRequireConfig(require bool) ConfigOptionHow is this guide?
Last updated on