Queue
Queue
Message queue with in-memory, Redis, RabbitMQ, and NATS backends
Overview
github.com/xraph/forge/extensions/queue provides a unified message queue interface backed by
in-memory, Redis Streams, RabbitMQ, or NATS JetStream. It registers a QueueService in the DI
container that manages connection lifecycle and exposes queue declaration, message publishing,
consuming, and dead-letter queue operations.
What It Registers
| Service | DI Key | Type |
|---|---|---|
| Queue service | queue | *QueueService (also satisfies Queue) |
The service is managed by Vessel. When DatabaseRedisConnection is configured, the extension
declares a dependency on the database extension to reuse an existing Redis client.
Quick Start
package main
import (
"context"
"fmt"
"time"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/queue"
)
func main() {
app := forge.NewApp(forge.AppConfig{Name: "my-app", Version: "1.0.0"})
// In-memory queue (development)
app.RegisterExtension(queue.NewExtension())
// Or connect to RabbitMQ
app.RegisterExtension(queue.NewExtension(
queue.WithDriver("rabbitmq"),
queue.WithURL("amqp://guest:guest@localhost:5672/"),
queue.WithPrefetch(10),
queue.WithConcurrency(5),
queue.WithDeadLetter(true),
))
ctx := context.Background()
app.Start(ctx)
defer app.Stop(ctx)
q := queue.MustGetQueue(app.Container())
// Declare a queue
q.DeclareQueue(ctx, "emails", queue.QueueOptions{
Durable: true,
MessageTTL: 24 * time.Hour,
MaxLength: 10000,
DeadLetterQueue: "emails.dlq",
})
// Publish a message
q.Publish(ctx, "emails", queue.Message{
Body: []byte(`{"to":"alice@example.com","subject":"Welcome"}`),
Headers: map[string]string{"content-type": "application/json"},
})
// Publish a delayed message
q.PublishDelayed(ctx, "emails", queue.Message{
Body: []byte(`{"to":"bob@example.com","subject":"Reminder"}`),
}, 5*time.Minute)
// Consume messages
q.Consume(ctx, "emails", func(ctx context.Context, msg queue.Message) error {
fmt.Printf("Processing email: %s\n", msg.ID)
// Process the message...
return q.Ack(ctx, msg.ID)
}, queue.ConsumeOptions{
Concurrency: 3,
PrefetchCount: 10,
RetryStrategy: queue.RetryStrategy{
MaxRetries: 3,
InitialInterval: 1 * time.Second,
Multiplier: 2.0,
},
})
}Using Queue in Your Services
Inject *queue.QueueService for automatic DI resolution:
type EmailWorker struct {
queue queue.Queue
logger forge.Logger
}
func NewEmailWorker(q *queue.QueueService, logger forge.Logger) *EmailWorker {
return &EmailWorker{queue: q, logger: logger}
}
func (w *EmailWorker) Enqueue(ctx context.Context, email Email) error {
body, _ := json.Marshal(email)
return w.queue.Publish(ctx, "emails", queue.Message{
Body: body,
Priority: 1,
Headers: map[string]string{"type": "transactional"},
})
}
func (w *EmailWorker) Start(ctx context.Context) error {
return w.queue.Consume(ctx, "emails", w.processEmail, queue.DefaultConsumeOptions())
}
func (w *EmailWorker) processEmail(ctx context.Context, msg queue.Message) error {
var email Email
json.Unmarshal(msg.Body, &email)
// Send email...
return w.queue.Ack(ctx, msg.ID)
}Register with Vessel:
forge.ProvideConstructor(app.Container(), NewEmailWorker)Key Concepts
- Drivers -- select
inmemory,redis,rabbitmq, ornatsvia config. Each implements the sameQueueinterface. - Queue declaration -- declare queues with durability, auto-delete, exclusive access, message TTL, max length, and priority support.
- Publishing -- send single messages, batches, or delayed messages with headers, priority, and expiration.
- Consuming -- subscribe to queues with configurable prefetch, concurrency, auto-ack, retry strategy, and timeout per message.
- Dead-letter queues -- failed messages are routed to a dead-letter queue (suffix-based naming). Inspect and requeue dead letters.
- Message acknowledgement -- explicit
Ack,Nack(requeue), andReject(discard) per message.
Important Runtime Notes
- The Redis backend uses Redis Streams and can optionally reuse a Redis client from the database extension via
DatabaseRedisConnection. - Queue stats include publish rate, delivery rate, ack rate, memory usage, and connection count.
- Health checks delegate to the backend's
Ping()method.
Detailed Pages
How is this guide?