ExtensionsBrokers

MQTT Extension

IoT messaging and pub/sub communication with MQTT protocol support

MQTT Extension

The MQTT extension provides comprehensive support for IoT messaging and publish/subscribe communication patterns using the MQTT protocol. It enables real-time device communication, sensor data collection, and event-driven architectures.

Features

Core Messaging

  • Publish/Subscribe: Full pub/sub messaging patterns
  • QoS Levels: Support for QoS 0, 1, and 2
  • Topic Wildcards: Single-level (+) and multi-level (#) wildcards
  • Retained Messages: Message persistence for late subscribers
  • Message Routing: Flexible message handling and routing

Device Communication

  • IoT Integration: Seamless device connectivity
  • Sensor Data: Real-time sensor data collection
  • Device Control: Remote device command and control
  • Fleet Management: Multi-device coordination
  • Edge Computing: Local processing capabilities

Reliability & Performance

  • Auto-Reconnect: Automatic connection recovery
  • Connection Pooling: Efficient connection management
  • Message Persistence: File and memory-based storage
  • Clean Sessions: Configurable session handling
  • Keep-Alive: Connection health monitoring

Security

  • TLS/SSL: Encrypted communication
  • mTLS: Mutual authentication
  • Authentication: Username/password authentication
  • Certificate Management: X.509 certificate support
  • Secure Connections: End-to-end encryption

Advanced Features

  • Last Will and Testament: Automatic disconnect notifications
  • Message Ordering: Ordered message delivery
  • Batch Operations: Bulk message handling
  • Custom Handlers: Extensible event handling
  • Health Monitoring: Connection and client health checks

Observability

  • Metrics: Built-in Prometheus metrics
  • Tracing: Distributed tracing support
  • Logging: Comprehensive logging
  • Statistics: Client and connection statistics
  • Health Checks: Endpoint health monitoring

Installation

go get github.com/forge-build/forge/extensions/mqtt

Quick Start

Basic MQTT Client

package main

import (
    "context"
    "log"
    "time"
    
    "github.com/forge-build/forge"
    "github.com/forge-build/forge/extensions/mqtt"
)

func main() {
    // Create Forge app
    app := forge.New()
    
    // Add MQTT extension
    app.Use(mqtt.New(
        mqtt.WithBroker("tcp://localhost:1883"),
        mqtt.WithClientID("my-client"),
        mqtt.WithQoS(1),
    ))
    
    // Get MQTT client
    var mqttClient mqtt.MQTT
    app.Resolve(&mqttClient)
    
    // Connect to broker
    if err := mqttClient.Connect(); err != nil {
        log.Fatal(err)
    }
    
    // Publish message
    err := mqttClient.Publish("sensors/temperature", map[string]interface{}{
        "value": 23.5,
        "unit":  "celsius",
        "time":  time.Now(),
    })
    if err != nil {
        log.Printf("Publish failed: %v", err)
    }
    
    // Subscribe to topic
    err = mqttClient.Subscribe("sensors/+", func(topic string, payload []byte) {
        log.Printf("Received on %s: %s", topic, string(payload))
    })
    if err != nil {
        log.Printf("Subscribe failed: %v", err)
    }
    
    // Start the app
    app.Run()
}

IoT Sensor Data Collection

package main

import (
    "context"
    "encoding/json"
    "log"
    "time"
    
    "github.com/forge-build/forge"
    "github.com/forge-build/forge/extensions/mqtt"
)

type SensorData struct {
    DeviceID    string    `json:"device_id"`
    SensorType  string    `json:"sensor_type"`
    Value       float64   `json:"value"`
    Unit        string    `json:"unit"`
    Timestamp   time.Time `json:"timestamp"`
    Location    string    `json:"location"`
}

func main() {
    app := forge.New()
    
    // Configure MQTT for IoT
    app.Use(mqtt.New(
        mqtt.WithBroker("ssl://iot.example.com:8883"),
        mqtt.WithClientID("sensor-collector"),
        mqtt.WithCredentials("iot-user", "secure-password"),
        mqtt.WithTLS("client.crt", "client.key", "ca.crt", false),
        mqtt.WithQoS(1),
        mqtt.WithAutoReconnect(true),
    ))
    
    var mqttClient mqtt.MQTT
    app.Resolve(&mqttClient)
    
    // Connect with error handling
    if err := mqttClient.Connect(); err != nil {
        log.Fatal("Failed to connect:", err)
    }
    
    // Subscribe to all sensor data
    err := mqttClient.Subscribe("sensors/+/+", func(topic string, payload []byte) {
        var data SensorData
        if err := json.Unmarshal(payload, &data); err != nil {
            log.Printf("Failed to parse sensor data: %v", err)
            return
        }
        
        log.Printf("Sensor data from %s: %s = %.2f %s", 
            data.DeviceID, data.SensorType, data.Value, data.Unit)
        
        // Process sensor data
        processSensorData(data)
    })
    
    if err != nil {
        log.Fatal("Failed to subscribe:", err)
    }
    
    app.Run()
}

func processSensorData(data SensorData) {
    // Store in database, trigger alerts, etc.
}

Device Control System

package main

import (
    "encoding/json"
    "log"
    
    "github.com/forge-build/forge"
    "github.com/forge-build/forge/extensions/mqtt"
)

type DeviceCommand struct {
    DeviceID string                 `json:"device_id"`
    Command  string                 `json:"command"`
    Params   map[string]interface{} `json:"params"`
}

type DeviceStatus struct {
    DeviceID string `json:"device_id"`
    Status   string `json:"status"`
    Message  string `json:"message"`
}

func main() {
    app := forge.New()
    
    app.Use(mqtt.New(
        mqtt.WithBroker("tcp://localhost:1883"),
        mqtt.WithClientID("device-controller"),
        mqtt.WithQoS(2), // Exactly once delivery for commands
    ))
    
    var mqttClient mqtt.MQTT
    app.Resolve(&mqttClient)
    
    if err := mqttClient.Connect(); err != nil {
        log.Fatal(err)
    }
    
    // Subscribe to device status updates
    mqttClient.Subscribe("devices/+/status", func(topic string, payload []byte) {
        var status DeviceStatus
        if err := json.Unmarshal(payload, &status); err != nil {
            log.Printf("Failed to parse status: %v", err)
            return
        }
        
        log.Printf("Device %s status: %s - %s", 
            status.DeviceID, status.Status, status.Message)
    })
    
    // Send command to device
    sendDeviceCommand := func(deviceID, command string, params map[string]interface{}) error {
        cmd := DeviceCommand{
            DeviceID: deviceID,
            Command:  command,
            Params:   params,
        }
        
        topic := fmt.Sprintf("devices/%s/commands", deviceID)
        return mqttClient.Publish(topic, cmd)
    }
    
    // Example: Turn on LED
    err := sendDeviceCommand("device-001", "led_control", map[string]interface{}{
        "state": "on",
        "color": "blue",
        "brightness": 80,
    })
    
    if err != nil {
        log.Printf("Failed to send command: %v", err)
    }
    
    app.Run()
}

Configuration

Programmatic Configuration

app.Use(mqtt.New(
    // Connection settings
    mqtt.WithBroker("tcp://localhost:1883"),
    mqtt.WithClientID("my-client"),
    mqtt.WithCredentials("username", "password"),
    
    // TLS/SSL settings
    mqtt.WithTLS("client.crt", "client.key", "ca.crt", false),
    
    // QoS and reliability
    mqtt.WithQoS(1),
    mqtt.WithCleanSession(true),
    mqtt.WithAutoReconnect(true),
    mqtt.WithKeepAlive(30 * time.Second),
    
    // Last Will and Testament
    mqtt.WithWill("devices/my-client/status", "offline", 1, true),
    
    // Observability
    mqtt.WithMetrics(true),
    mqtt.WithTracing(true),
))

YAML Configuration

extensions:
  mqtt:
    # Connection settings
    broker: "tcp://localhost:1883"
    client_id: "my-client"
    username: "mqtt-user"
    password: "mqtt-password"
    clean_session: true
    connect_timeout: "30s"
    keep_alive: "60s"
    ping_timeout: "10s"
    max_reconnect_delay: "10m"
    
    # TLS/SSL settings
    enable_tls: true
    tls_cert_file: "/path/to/client.crt"
    tls_key_file: "/path/to/client.key"
    tls_ca_file: "/path/to/ca.crt"
    tls_skip_verify: false
    
    # QoS and reliability
    default_qos: 1
    auto_reconnect: true
    resume_subs: true
    max_reconnect_attempts: 10
    write_timeout: "30s"
    
    # Message handling
    message_channel_depth: 100
    order_matters: true
    message_store: "file"
    store_directory: "/var/lib/mqtt"
    
    # Last Will and Testament
    will_enabled: true
    will_topic: "devices/my-client/status"
    will_payload: "offline"
    will_qos: 1
    will_retained: true
    
    # Observability
    enable_metrics: true
    enable_tracing: true
    enable_logging: true

Environment Variables

# Connection
MQTT_BROKER=tcp://localhost:1883
MQTT_CLIENT_ID=my-client
MQTT_USERNAME=mqtt-user
MQTT_PASSWORD=mqtt-password

# TLS
MQTT_ENABLE_TLS=true
MQTT_TLS_CERT_FILE=/path/to/client.crt
MQTT_TLS_KEY_FILE=/path/to/client.key
MQTT_TLS_CA_FILE=/path/to/ca.crt

# QoS and reliability
MQTT_DEFAULT_QOS=1
MQTT_AUTO_RECONNECT=true
MQTT_KEEP_ALIVE=60s

# Last Will and Testament
MQTT_WILL_ENABLED=true
MQTT_WILL_TOPIC=devices/my-client/status
MQTT_WILL_PAYLOAD=offline

Usage Patterns

Publishing Messages

Synchronous Publishing

// Simple string message
err := mqttClient.Publish("topic/test", "Hello, MQTT!")

// JSON message
data := map[string]interface{}{
    "temperature": 23.5,
    "humidity":    65.2,
    "timestamp":   time.Now(),
}
err := mqttClient.Publish("sensors/room1", data)

// Binary data
err := mqttClient.PublishBytes("data/binary", []byte{0x01, 0x02, 0x03})

Asynchronous Publishing

// Publish with callback
token := mqttClient.PublishAsync("topic/async", "message", func(token mqtt.Token) {
    if token.Error() != nil {
        log.Printf("Publish failed: %v", token.Error())
    } else {
        log.Println("Message published successfully")
    }
})

// Wait for completion
token.Wait()

QoS-Specific Publishing

// QoS 0 - At most once
err := mqttClient.PublishWithQoS("topic/qos0", "message", 0)

// QoS 1 - At least once
err := mqttClient.PublishWithQoS("topic/qos1", "message", 1)

// QoS 2 - Exactly once
err := mqttClient.PublishWithQoS("topic/qos2", "message", 2)

Retained Messages

// Publish retained message
err := mqttClient.PublishRetained("status/device1", "online")

// Clear retained message
err := mqttClient.PublishRetained("status/device1", "")

Subscribing to Topics

Basic Subscription

err := mqttClient.Subscribe("sensors/temperature", func(topic string, payload []byte) {
    log.Printf("Temperature: %s", string(payload))
})

Multiple Topic Subscription

topics := map[string]mqtt.MessageHandler{
    "sensors/temperature": func(topic string, payload []byte) {
        // Handle temperature data
    },
    "sensors/humidity": func(topic string, payload []byte) {
        // Handle humidity data
    },
    "alerts/+": func(topic string, payload []byte) {
        // Handle all alerts
    },
}

err := mqttClient.SubscribeMultiple(topics)

Wildcard Subscriptions

// Single-level wildcard
err := mqttClient.Subscribe("sensors/+/temperature", func(topic string, payload []byte) {
    // Matches: sensors/room1/temperature, sensors/room2/temperature
})

// Multi-level wildcard
err := mqttClient.Subscribe("devices/#", func(topic string, payload []byte) {
    // Matches: devices/sensor1, devices/sensor1/status, devices/sensor1/data/temp
})

QoS-Specific Subscription

err := mqttClient.SubscribeWithQoS("critical/alerts", 2, func(topic string, payload []byte) {
    // Handle critical alerts with exactly-once delivery
})

Message Handling

Structured Message Handling

type SensorMessage struct {
    DeviceID  string    `json:"device_id"`
    Value     float64   `json:"value"`
    Timestamp time.Time `json:"timestamp"`
}

err := mqttClient.Subscribe("sensors/+", func(topic string, payload []byte) {
    var msg SensorMessage
    if err := json.Unmarshal(payload, &msg); err != nil {
        log.Printf("Failed to parse message: %v", err)
        return
    }
    
    // Process structured message
    processSensorData(msg)
})

Message Routing

// Set up message router
mqttClient.AddRoute("sensors/+/temperature", handleTemperature)
mqttClient.AddRoute("sensors/+/humidity", handleHumidity)
mqttClient.AddRoute("alerts/#", handleAlerts)

// Default handler for unmatched topics
mqttClient.SetDefaultHandler(func(topic string, payload []byte) {
    log.Printf("Unhandled message on %s: %s", topic, string(payload))
})

Connection Management

Connection Handlers

// Connection established
mqttClient.SetConnectHandler(func() {
    log.Println("Connected to MQTT broker")
    
    // Re-subscribe to topics
    mqttClient.Subscribe("sensors/+", handleSensorData)
})

// Connection lost
mqttClient.SetConnectionLostHandler(func(err error) {
    log.Printf("Connection lost: %v", err)
    
    // Implement custom reconnection logic if needed
})

// Reconnecting
mqttClient.SetReconnectingHandler(func(opts *mqtt.ClientOptions, retryCount int) {
    log.Printf("Reconnecting... attempt %d", retryCount)
})

Manual Connection Control

// Connect
if err := mqttClient.Connect(); err != nil {
    log.Fatal("Failed to connect:", err)
}

// Check connection status
if mqttClient.IsConnected() {
    log.Println("Client is connected")
}

// Disconnect gracefully
mqttClient.Disconnect(250) // 250ms timeout

Advanced Features

Last Will and Testament

// Configure LWT during client creation
app.Use(mqtt.New(
    mqtt.WithBroker("tcp://localhost:1883"),
    mqtt.WithClientID("device-001"),
    mqtt.WithWill("devices/device-001/status", "offline", 1, true),
))

// Or set LWT programmatically
mqttClient.SetWill("devices/device-001/status", "offline", 1, true)

Message Persistence

// File-based message store
app.Use(mqtt.New(
    mqtt.WithBroker("tcp://localhost:1883"),
    mqtt.WithConfig(mqtt.Config{
        MessageStore:   "file",
        StoreDirectory: "/var/lib/mqtt/store",
    }),
))

// Memory-based store (default)
app.Use(mqtt.New(
    mqtt.WithBroker("tcp://localhost:1883"),
    mqtt.WithConfig(mqtt.Config{
        MessageStore: "memory",
    }),
))

Batch Operations

// Batch publish
messages := []mqtt.Message{
    {Topic: "sensors/temp1", Payload: "23.5"},
    {Topic: "sensors/temp2", Payload: "24.1"},
    {Topic: "sensors/temp3", Payload: "22.8"},
}

for _, msg := range messages {
    mqttClient.Publish(msg.Topic, msg.Payload)
}

// Batch subscribe
topics := map[string]mqtt.MessageHandler{
    "sensors/temperature": handleTemperature,
    "sensors/humidity":    handleHumidity,
    "sensors/pressure":    handlePressure,
}

err := mqttClient.SubscribeMultiple(topics)

Security

TLS/SSL Configuration

Basic TLS

app.Use(mqtt.New(
    mqtt.WithBroker("ssl://broker.example.com:8883"),
    mqtt.WithTLS("", "", "", true), // Skip certificate verification
))

Certificate-Based TLS

app.Use(mqtt.New(
    mqtt.WithBroker("ssl://broker.example.com:8883"),
    mqtt.WithTLS("client.crt", "client.key", "ca.crt", false),
))

Mutual TLS (mTLS)

extensions:
  mqtt:
    broker: "ssl://secure-broker.example.com:8883"
    enable_tls: true
    tls_cert_file: "/etc/ssl/certs/client.crt"
    tls_key_file: "/etc/ssl/private/client.key"
    tls_ca_file: "/etc/ssl/certs/ca.crt"
    tls_skip_verify: false

Authentication

Username/Password

app.Use(mqtt.New(
    mqtt.WithBroker("tcp://broker.example.com:1883"),
    mqtt.WithCredentials("mqtt-user", "secure-password"),
))

Certificate-Based Authentication

app.Use(mqtt.New(
    mqtt.WithBroker("ssl://broker.example.com:8883"),
    mqtt.WithTLS("client.crt", "client.key", "ca.crt", false),
    // Client certificate serves as authentication
))

Security Best Practices

// Secure configuration example
app.Use(mqtt.New(
    // Use encrypted connection
    mqtt.WithBroker("ssl://broker.example.com:8883"),
    
    // Strong authentication
    mqtt.WithCredentials("secure-user", "complex-password"),
    
    // Certificate validation
    mqtt.WithTLS("client.crt", "client.key", "ca.crt", false),
    
    // Unique client ID
    mqtt.WithClientID(fmt.Sprintf("client-%s", uuid.New().String())),
    
    // Clean sessions for security
    mqtt.WithCleanSession(true),
    
    // Appropriate QoS
    mqtt.WithQoS(1),
))

Monitoring and Observability

Built-in Metrics

The MQTT extension provides comprehensive metrics:

// Enable metrics
app.Use(mqtt.New(
    mqtt.WithMetrics(true),
))

// Metrics available:
// - mqtt_messages_published_total
// - mqtt_messages_received_total
// - mqtt_connection_status
// - mqtt_subscription_count
// - mqtt_publish_duration_seconds
// - mqtt_connection_attempts_total
// - mqtt_reconnection_count

Client Statistics

// Get client statistics
stats := mqttClient.GetStats()
fmt.Printf("Messages sent: %d\n", stats.MessagesSent)
fmt.Printf("Messages received: %d\n", stats.MessagesReceived)
fmt.Printf("Connection uptime: %v\n", stats.ConnectedSince)
fmt.Printf("Reconnection count: %d\n", stats.ReconnectCount)

Health Checks

// Check client health
health := mqttClient.Health()
if health.Status == "healthy" {
    log.Println("MQTT client is healthy")
} else {
    log.Printf("MQTT client unhealthy: %s", health.Message)
}

// Get subscription info
subscriptions := mqttClient.GetSubscriptions()
for topic, qos := range subscriptions {
    log.Printf("Subscribed to %s with QoS %d", topic, qos)
}

Custom Metrics

import "github.com/prometheus/client_golang/prometheus"

var (
    sensorDataReceived = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "sensor_data_received_total",
            Help: "Total number of sensor data messages received",
        },
        []string{"device_id", "sensor_type"},
    )
)

func init() {
    prometheus.MustRegister(sensorDataReceived)
}

// In message handler
mqttClient.Subscribe("sensors/+/+", func(topic string, payload []byte) {
    // Parse topic to extract device_id and sensor_type
    parts := strings.Split(topic, "/")
    if len(parts) >= 3 {
        deviceID := parts[1]
        sensorType := parts[2]
        
        sensorDataReceived.WithLabelValues(deviceID, sensorType).Inc()
    }
    
    // Process message...
})

Performance Optimization

Connection Optimization

app.Use(mqtt.New(
    mqtt.WithBroker("tcp://broker.example.com:1883"),
    
    // Optimize connection settings
    mqtt.WithKeepAlive(30 * time.Second),
    mqtt.WithConfig(mqtt.Config{
        ConnectTimeout:       10 * time.Second,
        PingTimeout:          5 * time.Second,
        WriteTimeout:         10 * time.Second,
        MaxReconnectDelay:    5 * time.Minute,
        MessageChannelDepth:  1000,
        OrderMatters:         false, // Better performance if order doesn't matter
    }),
))

Message Optimization

// Use appropriate QoS levels
// QoS 0 for non-critical data (best performance)
mqttClient.PublishWithQoS("sensors/temperature", data, 0)

// QoS 1 for important data
mqttClient.PublishWithQoS("alerts/critical", alert, 1)

// QoS 2 only for critical commands
mqttClient.PublishWithQoS("commands/shutdown", cmd, 2)

// Batch similar operations
var wg sync.WaitGroup
for _, sensor := range sensors {
    wg.Add(1)
    go func(s Sensor) {
        defer wg.Done()
        mqttClient.Publish(s.Topic, s.Data)
    }(sensor)
}
wg.Wait()

Memory Optimization

app.Use(mqtt.New(
    mqtt.WithConfig(mqtt.Config{
        // Use memory store for better performance
        MessageStore: "memory",
        
        // Limit channel depth
        MessageChannelDepth: 100,
        
        // Clean sessions to avoid state buildup
        CleanSession: true,
        
        // Disable order if not needed
        OrderMatters: false,
    }),
))

Error Handling

Common Error Types

import "github.com/forge-build/forge/extensions/mqtt"

// Handle specific errors
err := mqttClient.Publish("topic", "message")
switch {
case errors.Is(err, mqtt.ErrNotConnected):
    log.Println("Client not connected, attempting reconnection...")
    mqttClient.Connect()
    
case errors.Is(err, mqtt.ErrPublishFailed):
    log.Println("Publish failed, retrying...")
    // Implement retry logic
    
case errors.Is(err, mqtt.ErrTimeout):
    log.Println("Operation timed out")
    
case errors.Is(err, mqtt.ErrInvalidTopic):
    log.Println("Invalid topic format")
    
default:
    log.Printf("Unexpected error: %v", err)
}

Retry Logic

func publishWithRetry(client mqtt.MQTT, topic string, payload interface{}, maxRetries int) error {
    var err error
    for i := 0; i < maxRetries; i++ {
        err = client.Publish(topic, payload)
        if err == nil {
            return nil
        }
        
        if errors.Is(err, mqtt.ErrNotConnected) {
            // Try to reconnect
            if connErr := client.Connect(); connErr != nil {
                log.Printf("Reconnection failed: %v", connErr)
            }
        }
        
        // Exponential backoff
        time.Sleep(time.Duration(1<<i) * time.Second)
    }
    
    return fmt.Errorf("failed after %d retries: %w", maxRetries, err)
}

Circuit Breaker Pattern

type CircuitBreaker struct {
    failures    int
    maxFailures int
    timeout     time.Duration
    lastFailure time.Time
    state       string // "closed", "open", "half-open"
    mutex       sync.RWMutex
}

func (cb *CircuitBreaker) Call(fn func() error) error {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()
    
    if cb.state == "open" {
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = "half-open"
        } else {
            return fmt.Errorf("circuit breaker is open")
        }
    }
    
    err := fn()
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        
        if cb.failures >= cb.maxFailures {
            cb.state = "open"
        }
        return err
    }
    
    cb.failures = 0
    cb.state = "closed"
    return nil
}

// Usage
cb := &CircuitBreaker{
    maxFailures: 5,
    timeout:     30 * time.Second,
    state:       "closed",
}

err := cb.Call(func() error {
    return mqttClient.Publish("topic", "message")
})

Best Practices

Message Design

// Use structured messages
type DeviceMessage struct {
    DeviceID    string                 `json:"device_id"`
    MessageType string                 `json:"message_type"`
    Timestamp   time.Time              `json:"timestamp"`
    Data        map[string]interface{} `json:"data"`
    Metadata    map[string]string      `json:"metadata,omitempty"`
}

// Keep messages small and focused
func publishSensorReading(client mqtt.MQTT, deviceID string, sensorType string, value float64) error {
    message := DeviceMessage{
        DeviceID:    deviceID,
        MessageType: "sensor_reading",
        Timestamp:   time.Now(),
        Data: map[string]interface{}{
            "sensor_type": sensorType,
            "value":       value,
        },
    }
    
    topic := fmt.Sprintf("devices/%s/sensors/%s", deviceID, sensorType)
    return client.Publish(topic, message)
}

Topic Design

// Good topic hierarchy
// devices/{device_id}/sensors/{sensor_type}
// devices/{device_id}/commands/{command_type}
// devices/{device_id}/status
// alerts/{severity}/{device_id}
// system/{component}/health

// Use consistent naming
const (
    TopicDeviceSensors  = "devices/%s/sensors/%s"
    TopicDeviceCommands = "devices/%s/commands/%s"
    TopicDeviceStatus   = "devices/%s/status"
    TopicAlerts         = "alerts/%s/%s"
    TopicSystemHealth   = "system/%s/health"
)

func getDeviceSensorTopic(deviceID, sensorType string) string {
    return fmt.Sprintf(TopicDeviceSensors, deviceID, sensorType)
}

Connection Management

// Implement proper connection lifecycle
type MQTTManager struct {
    client mqtt.MQTT
    ctx    context.Context
    cancel context.CancelFunc
}

func NewMQTTManager(client mqtt.MQTT) *MQTTManager {
    ctx, cancel := context.WithCancel(context.Background())
    
    manager := &MQTTManager{
        client: client,
        ctx:    ctx,
        cancel: cancel,
    }
    
    // Set up connection handlers
    client.SetConnectHandler(manager.onConnect)
    client.SetConnectionLostHandler(manager.onConnectionLost)
    
    return manager
}

func (m *MQTTManager) onConnect() {
    log.Println("MQTT connected")
    
    // Re-establish subscriptions
    m.setupSubscriptions()
}

func (m *MQTTManager) onConnectionLost(err error) {
    log.Printf("MQTT connection lost: %v", err)
    
    // Implement custom recovery logic
    go m.handleReconnection()
}

func (m *MQTTManager) setupSubscriptions() {
    subscriptions := map[string]mqtt.MessageHandler{
        "devices/+/sensors/+": m.handleSensorData,
        "devices/+/status":    m.handleDeviceStatus,
        "alerts/#":            m.handleAlerts,
    }
    
    for topic, handler := range subscriptions {
        if err := m.client.Subscribe(topic, handler); err != nil {
            log.Printf("Failed to subscribe to %s: %v", topic, err)
        }
    }
}

func (m *MQTTManager) Shutdown() {
    m.cancel()
    m.client.Disconnect(1000)
}

Security Best Practices

// Secure client configuration
func createSecureMQTTClient() mqtt.MQTT {
    return mqtt.New(
        // Use TLS
        mqtt.WithBroker("ssl://broker.example.com:8883"),
        
        // Strong authentication
        mqtt.WithCredentials(
            os.Getenv("MQTT_USERNAME"),
            os.Getenv("MQTT_PASSWORD"),
        ),
        
        // Certificate validation
        mqtt.WithTLS(
            os.Getenv("MQTT_CERT_FILE"),
            os.Getenv("MQTT_KEY_FILE"),
            os.Getenv("MQTT_CA_FILE"),
            false, // Don't skip verification
        ),
        
        // Unique client ID
        mqtt.WithClientID(fmt.Sprintf("client-%s-%d", 
            os.Getenv("HOSTNAME"), 
            time.Now().Unix(),
        )),
        
        // Clean sessions
        mqtt.WithCleanSession(true),
    )
}

// Topic access control
func validateTopicAccess(clientID, topic string) bool {
    // Implement topic-based access control
    allowedTopics := getClientTopics(clientID)
    
    for _, allowed := range allowedTopics {
        if matched, _ := path.Match(allowed, topic); matched {
            return true
        }
    }
    
    return false
}

Troubleshooting

Common Issues

Connection Problems

// Debug connection issues
if err := mqttClient.Connect(); err != nil {
    log.Printf("Connection failed: %v", err)
    
    // Check common issues:
    // 1. Broker address and port
    // 2. Network connectivity
    // 3. Authentication credentials
    // 4. TLS configuration
    // 5. Firewall settings
}

// Test connectivity
func testMQTTConnectivity(broker string) error {
    conn, err := net.DialTimeout("tcp", broker, 5*time.Second)
    if err != nil {
        return fmt.Errorf("cannot connect to broker: %w", err)
    }
    conn.Close()
    return nil
}

Subscription Issues

// Debug subscription problems
err := mqttClient.Subscribe("sensors/+", func(topic string, payload []byte) {
    log.Printf("Received: %s", topic)
})

if err != nil {
    switch {
    case errors.Is(err, mqtt.ErrNotConnected):
        log.Println("Not connected - ensure client is connected first")
        
    case errors.Is(err, mqtt.ErrSubscribeFailed):
        log.Println("Subscription failed - check topic format and permissions")
        
    case errors.Is(err, mqtt.ErrInvalidTopic):
        log.Println("Invalid topic - check topic syntax")
    }
}

// Verify subscriptions
subscriptions := mqttClient.GetSubscriptions()
for topic, qos := range subscriptions {
    log.Printf("Active subscription: %s (QoS %d)", topic, qos)
}

Message Delivery Issues

// Debug message delivery
token := mqttClient.PublishAsync("test/topic", "test message", func(token mqtt.Token) {
    if token.Error() != nil {
        log.Printf("Publish failed: %v", token.Error())
        
        // Common causes:
        // 1. Client not connected
        // 2. Invalid topic
        // 3. Message too large
        // 4. QoS issues
        // 5. Broker overload
    } else {
        log.Println("Message delivered successfully")
    }
})

// Wait with timeout
if !token.WaitTimeout(5 * time.Second) {
    log.Println("Publish timeout")
}

Performance Issues

// Monitor performance
stats := mqttClient.GetStats()
log.Printf("Messages/sec: %.2f", 
    float64(stats.MessagesSent) / time.Since(stats.ConnectedSince).Seconds())

// Check for bottlenecks
if stats.MessagesSent > stats.MessagesAcknowledged {
    log.Printf("Pending acknowledgments: %d", 
        stats.MessagesSent - stats.MessagesAcknowledged)
}

// Monitor memory usage
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Printf("Memory usage: %d KB", m.Alloc / 1024)

Debugging Tools

// Enable debug logging
app.Use(mqtt.New(
    mqtt.WithConfig(mqtt.Config{
        EnableLogging: true,
    }),
))

// Custom debug handler
mqttClient.SetDefaultHandler(func(topic string, payload []byte) {
    log.Printf("DEBUG: Unhandled message on %s: %s", topic, string(payload))
})

// Connection state monitoring
go func() {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        if mqttClient.IsConnected() {
            log.Println("MQTT: Connected")
        } else {
            log.Println("MQTT: Disconnected")
        }
    }
}()

API Reference

Core Interface

type MQTT interface {
    // Connection management
    Connect() error
    Disconnect(quiesce uint)
    IsConnected() bool
    
    // Publishing
    Publish(topic string, payload interface{}) error
    PublishBytes(topic string, payload []byte) error
    PublishWithQoS(topic string, payload interface{}, qos byte) error
    PublishRetained(topic string, payload interface{}) error
    PublishAsync(topic string, payload interface{}, callback func(Token)) Token
    
    // Subscribing
    Subscribe(topic string, handler MessageHandler) error
    SubscribeWithQoS(topic string, qos byte, handler MessageHandler) error
    SubscribeMultiple(topics map[string]MessageHandler) error
    Unsubscribe(topics ...string) error
    
    // Message handling
    AddRoute(topic string, handler MessageHandler)
    SetDefaultHandler(handler MessageHandler)
    
    // Connection handlers
    SetConnectHandler(handler ConnectHandler)
    SetConnectionLostHandler(handler ConnectionLostHandler)
    SetReconnectingHandler(handler ReconnectingHandler)
    
    // Client information
    GetClient() Client
    GetStats() ClientStats
    GetSubscriptions() map[string]byte
    
    // Health and monitoring
    Health() HealthStatus
}

Configuration Options

type Config struct {
    // Connection settings
    Broker            string        `json:"broker"`
    ClientID          string        `json:"client_id"`
    Username          string        `json:"username,omitempty"`
    Password          string        `json:"password,omitempty"`
    CleanSession      bool          `json:"clean_session"`
    ConnectTimeout    time.Duration `json:"connect_timeout"`
    KeepAlive         time.Duration `json:"keep_alive"`
    PingTimeout       time.Duration `json:"ping_timeout"`
    MaxReconnectDelay time.Duration `json:"max_reconnect_delay"`
    
    // TLS/SSL settings
    EnableTLS     bool   `json:"enable_tls"`
    TLSCertFile   string `json:"tls_cert_file,omitempty"`
    TLSKeyFile    string `json:"tls_key_file,omitempty"`
    TLSCAFile     string `json:"tls_ca_file,omitempty"`
    TLSSkipVerify bool   `json:"tls_skip_verify"`
    
    // QoS and reliability
    DefaultQoS           byte          `json:"default_qos"`
    AutoReconnect        bool          `json:"auto_reconnect"`
    ResumeSubs           bool          `json:"resume_subs"`
    MaxReconnectAttempts int           `json:"max_reconnect_attempts"`
    WriteTimeout         time.Duration `json:"write_timeout"`
    
    // Message handling
    MessageChannelDepth uint   `json:"message_channel_depth"`
    OrderMatters        bool   `json:"order_matters"`
    MessageStore        string `json:"message_store"`
    StoreDirectory      string `json:"store_directory,omitempty"`
    
    // Last Will and Testament
    WillEnabled  bool   `json:"will_enabled"`
    WillTopic    string `json:"will_topic,omitempty"`
    WillPayload  string `json:"will_payload,omitempty"`
    WillQoS      byte   `json:"will_qos"`
    WillRetained bool   `json:"will_retained"`
    
    // Observability
    EnableMetrics bool `json:"enable_metrics"`
    EnableTracing bool `json:"enable_tracing"`
    EnableLogging bool `json:"enable_logging"`
}

Functional Options

// Connection options
func WithBroker(broker string) ConfigOption
func WithClientID(clientID string) ConfigOption
func WithCredentials(username, password string) ConfigOption
func WithCleanSession(clean bool) ConfigOption
func WithKeepAlive(duration time.Duration) ConfigOption
func WithAutoReconnect(enable bool) ConfigOption

// Security options
func WithTLS(certFile, keyFile, caFile string, skipVerify bool) ConfigOption

// QoS options
func WithQoS(qos byte) ConfigOption

// Last Will and Testament
func WithWill(topic, payload string, qos byte, retained bool) ConfigOption

// Observability options
func WithMetrics(enable bool) ConfigOption
func WithTracing(enable bool) ConfigOption

// Configuration
func WithConfig(config Config) ConfigOption
func WithRequireConfig(require bool) ConfigOption

How is this guide?

Last updated on