MQTT
MQTT
MQTT pub/sub client for IoT and messaging workloads
Overview
github.com/xraph/forge/extensions/mqtt provides an MQTT client built on the Eclipse Paho library.
It registers an MQTTService in the DI container that wraps a MQTT client with publish, subscribe,
connection lifecycle, and Last Will and Testament support.
What It Registers
| Service | DI Key | Type |
|---|---|---|
| MQTT service | mqtt | *MQTTService (also satisfies MQTT) |
The service is managed by Vessel. Start connects to the broker and Stop disconnects gracefully
with a 250ms quiesce period.
Quick Start
package main
import (
"context"
"fmt"
mqttclient "github.com/eclipse/paho.mqtt.golang"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/mqtt"
)
func main() {
app := forge.NewApp(forge.AppConfig{Name: "iot-gateway", Version: "1.0.0"})
app.RegisterExtension(mqtt.NewExtension(
mqtt.WithBroker("tcp://localhost:1883"),
mqtt.WithClientID("gateway-01"),
mqtt.WithCredentials("user", "password"),
mqtt.WithQoS(1),
mqtt.WithAutoReconnect(true),
))
ctx := context.Background()
app.Start(ctx)
defer app.Stop(ctx)
// Resolve the MQTT client from DI
svc, _ := forge.InjectType[*mqtt.MQTTService](app.Container())
client := svc // MQTTService satisfies the MQTT interface
// Subscribe to a topic
client.Subscribe("sensors/temperature/#", 1,
func(c mqttclient.Client, msg mqttclient.Message) {
fmt.Printf("Topic: %s, Payload: %s\n", msg.Topic(), msg.Payload())
},
)
// Subscribe to multiple topics at once
client.SubscribeMultiple(map[string]byte{
"sensors/humidity/#": 1,
"sensors/pressure/#": 1,
}, func(c mqttclient.Client, msg mqttclient.Message) {
fmt.Printf("Sensor data: %s = %s\n", msg.Topic(), msg.Payload())
})
// Publish a message
client.Publish("commands/device-01", 1, false, []byte(`{"action":"reset"}`))
// Publish with retained flag (new subscribers get the latest value)
client.Publish("status/gateway-01", 1, true, []byte(`{"status":"online"}`))
}Using MQTT in Your Services
Inject *mqtt.MQTTService for automatic DI resolution:
type SensorCollector struct {
mqtt mqtt.MQTT
logger forge.Logger
}
func NewSensorCollector(m *mqtt.MQTTService, logger forge.Logger) *SensorCollector {
return &SensorCollector{mqtt: m, logger: logger}
}
func (sc *SensorCollector) Start(ctx context.Context) error {
return sc.mqtt.Subscribe("sensors/#", 1,
func(c mqttclient.Client, msg mqttclient.Message) {
sc.logger.Info("sensor reading",
forge.F("topic", msg.Topic()),
forge.F("payload", string(msg.Payload())),
)
},
)
}
func (sc *SensorCollector) SendCommand(deviceID string, cmd []byte) error {
return sc.mqtt.Publish("commands/"+deviceID, 1, false, cmd)
}Register with Vessel:
forge.ProvideConstructor(app.Container(), NewSensorCollector)Connection Lifecycle Handlers
Register callbacks for connection events:
client.SetOnConnectHandler(func(c mqttclient.Client) {
fmt.Println("Connected to MQTT broker")
// Re-subscribe to topics if needed
})
client.SetConnectionLostHandler(func(c mqttclient.Client, err error) {
fmt.Printf("Connection lost: %v\n", err)
})
client.SetReconnectingHandler(func(c mqttclient.Client, opts *mqttclient.ClientOptions) {
fmt.Println("Attempting reconnection...")
})Key Concepts
- QoS levels -- publish and subscribe with QoS 0 (at most once), QoS 1 (at least once), or QoS 2 (exactly once). The default is configurable.
- Auto-reconnect -- the client automatically reconnects on connection loss with configurable max delay and attempt limits.
- Subscriptions -- subscribe to topics with wildcard support (
+single level,#multi level). Track active subscriptions. - Last Will and Testament -- configure a message that the broker publishes when the client disconnects unexpectedly.
- Retained messages -- publish messages with the retained flag so new subscribers receive the latest value immediately.
- Clean sessions -- toggle between persistent sessions (broker remembers subscriptions) and clean sessions.
Important Runtime Notes
- The MQTT client is created during
NewMQTTServicebut does not connect until Vessel callsStart. - Connection handlers (
OnConnect,OnConnectionLost,OnReconnecting) can be set for custom logic. - The
ResumeSubsoption re-subscribes to topics after reconnection.
Detailed Pages
How is this guide?