MQTT

MQTT

MQTT pub/sub client for IoT and messaging workloads

Overview

github.com/xraph/forge/extensions/mqtt provides an MQTT client built on the Eclipse Paho library. It registers an MQTTService in the DI container that wraps a MQTT client with publish, subscribe, connection lifecycle, and Last Will and Testament support.

What It Registers

ServiceDI KeyType
MQTT servicemqtt*MQTTService (also satisfies MQTT)

The service is managed by Vessel. Start connects to the broker and Stop disconnects gracefully with a 250ms quiesce period.

Quick Start

package main

import (
    "context"
    "fmt"

    mqttclient "github.com/eclipse/paho.mqtt.golang"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/mqtt"
)

func main() {
    app := forge.NewApp(forge.AppConfig{Name: "iot-gateway", Version: "1.0.0"})

    app.RegisterExtension(mqtt.NewExtension(
        mqtt.WithBroker("tcp://localhost:1883"),
        mqtt.WithClientID("gateway-01"),
        mqtt.WithCredentials("user", "password"),
        mqtt.WithQoS(1),
        mqtt.WithAutoReconnect(true),
    ))

    ctx := context.Background()
    app.Start(ctx)
    defer app.Stop(ctx)

    // Resolve the MQTT client from DI
    svc, _ := forge.InjectType[*mqtt.MQTTService](app.Container())
    client := svc // MQTTService satisfies the MQTT interface

    // Subscribe to a topic
    client.Subscribe("sensors/temperature/#", 1,
        func(c mqttclient.Client, msg mqttclient.Message) {
            fmt.Printf("Topic: %s, Payload: %s\n", msg.Topic(), msg.Payload())
        },
    )

    // Subscribe to multiple topics at once
    client.SubscribeMultiple(map[string]byte{
        "sensors/humidity/#":  1,
        "sensors/pressure/#": 1,
    }, func(c mqttclient.Client, msg mqttclient.Message) {
        fmt.Printf("Sensor data: %s = %s\n", msg.Topic(), msg.Payload())
    })

    // Publish a message
    client.Publish("commands/device-01", 1, false, []byte(`{"action":"reset"}`))

    // Publish with retained flag (new subscribers get the latest value)
    client.Publish("status/gateway-01", 1, true, []byte(`{"status":"online"}`))
}

Using MQTT in Your Services

Inject *mqtt.MQTTService for automatic DI resolution:

type SensorCollector struct {
    mqtt   mqtt.MQTT
    logger forge.Logger
}

func NewSensorCollector(m *mqtt.MQTTService, logger forge.Logger) *SensorCollector {
    return &SensorCollector{mqtt: m, logger: logger}
}

func (sc *SensorCollector) Start(ctx context.Context) error {
    return sc.mqtt.Subscribe("sensors/#", 1,
        func(c mqttclient.Client, msg mqttclient.Message) {
            sc.logger.Info("sensor reading",
                forge.F("topic", msg.Topic()),
                forge.F("payload", string(msg.Payload())),
            )
        },
    )
}

func (sc *SensorCollector) SendCommand(deviceID string, cmd []byte) error {
    return sc.mqtt.Publish("commands/"+deviceID, 1, false, cmd)
}

Register with Vessel:

forge.ProvideConstructor(app.Container(), NewSensorCollector)

Connection Lifecycle Handlers

Register callbacks for connection events:

client.SetOnConnectHandler(func(c mqttclient.Client) {
    fmt.Println("Connected to MQTT broker")
    // Re-subscribe to topics if needed
})

client.SetConnectionLostHandler(func(c mqttclient.Client, err error) {
    fmt.Printf("Connection lost: %v\n", err)
})

client.SetReconnectingHandler(func(c mqttclient.Client, opts *mqttclient.ClientOptions) {
    fmt.Println("Attempting reconnection...")
})

Key Concepts

  • QoS levels -- publish and subscribe with QoS 0 (at most once), QoS 1 (at least once), or QoS 2 (exactly once). The default is configurable.
  • Auto-reconnect -- the client automatically reconnects on connection loss with configurable max delay and attempt limits.
  • Subscriptions -- subscribe to topics with wildcard support (+ single level, # multi level). Track active subscriptions.
  • Last Will and Testament -- configure a message that the broker publishes when the client disconnects unexpectedly.
  • Retained messages -- publish messages with the retained flag so new subscribers receive the latest value immediately.
  • Clean sessions -- toggle between persistent sessions (broker remembers subscriptions) and clean sessions.

Important Runtime Notes

  • The MQTT client is created during NewMQTTService but does not connect until Vessel calls Start.
  • Connection handlers (OnConnect, OnConnectionLost, OnReconnecting) can be set for custom logic.
  • The ResumeSubs option re-subscribes to topics after reconnection.

Detailed Pages

How is this guide?

On this page