Queue

Queue

Message queue with in-memory, Redis, RabbitMQ, and NATS backends

Overview

github.com/xraph/forge/extensions/queue provides a unified message queue interface backed by in-memory, Redis Streams, RabbitMQ, or NATS JetStream. It registers a QueueService in the DI container that manages connection lifecycle and exposes queue declaration, message publishing, consuming, and dead-letter queue operations.

What It Registers

ServiceDI KeyType
Queue servicequeue*QueueService (also satisfies Queue)

The service is managed by Vessel. When DatabaseRedisConnection is configured, the extension declares a dependency on the database extension to reuse an existing Redis client.

Quick Start

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/queue"
)

func main() {
    app := forge.NewApp(forge.AppConfig{Name: "my-app", Version: "1.0.0"})

    // In-memory queue (development)
    app.RegisterExtension(queue.NewExtension())

    // Or connect to RabbitMQ
    app.RegisterExtension(queue.NewExtension(
        queue.WithDriver("rabbitmq"),
        queue.WithURL("amqp://guest:guest@localhost:5672/"),
        queue.WithPrefetch(10),
        queue.WithConcurrency(5),
        queue.WithDeadLetter(true),
    ))

    ctx := context.Background()
    app.Start(ctx)
    defer app.Stop(ctx)

    q := queue.MustGetQueue(app.Container())

    // Declare a queue
    q.DeclareQueue(ctx, "emails", queue.QueueOptions{
        Durable:         true,
        MessageTTL:      24 * time.Hour,
        MaxLength:       10000,
        DeadLetterQueue: "emails.dlq",
    })

    // Publish a message
    q.Publish(ctx, "emails", queue.Message{
        Body:    []byte(`{"to":"alice@example.com","subject":"Welcome"}`),
        Headers: map[string]string{"content-type": "application/json"},
    })

    // Publish a delayed message
    q.PublishDelayed(ctx, "emails", queue.Message{
        Body: []byte(`{"to":"bob@example.com","subject":"Reminder"}`),
    }, 5*time.Minute)

    // Consume messages
    q.Consume(ctx, "emails", func(ctx context.Context, msg queue.Message) error {
        fmt.Printf("Processing email: %s\n", msg.ID)
        // Process the message...
        return q.Ack(ctx, msg.ID)
    }, queue.ConsumeOptions{
        Concurrency:   3,
        PrefetchCount: 10,
        RetryStrategy: queue.RetryStrategy{
            MaxRetries:      3,
            InitialInterval: 1 * time.Second,
            Multiplier:      2.0,
        },
    })
}

Using Queue in Your Services

Inject *queue.QueueService for automatic DI resolution:

type EmailWorker struct {
    queue  queue.Queue
    logger forge.Logger
}

func NewEmailWorker(q *queue.QueueService, logger forge.Logger) *EmailWorker {
    return &EmailWorker{queue: q, logger: logger}
}

func (w *EmailWorker) Enqueue(ctx context.Context, email Email) error {
    body, _ := json.Marshal(email)
    return w.queue.Publish(ctx, "emails", queue.Message{
        Body:     body,
        Priority: 1,
        Headers:  map[string]string{"type": "transactional"},
    })
}

func (w *EmailWorker) Start(ctx context.Context) error {
    return w.queue.Consume(ctx, "emails", w.processEmail, queue.DefaultConsumeOptions())
}

func (w *EmailWorker) processEmail(ctx context.Context, msg queue.Message) error {
    var email Email
    json.Unmarshal(msg.Body, &email)
    // Send email...
    return w.queue.Ack(ctx, msg.ID)
}

Register with Vessel:

forge.ProvideConstructor(app.Container(), NewEmailWorker)

Key Concepts

  • Drivers -- select inmemory, redis, rabbitmq, or nats via config. Each implements the same Queue interface.
  • Queue declaration -- declare queues with durability, auto-delete, exclusive access, message TTL, max length, and priority support.
  • Publishing -- send single messages, batches, or delayed messages with headers, priority, and expiration.
  • Consuming -- subscribe to queues with configurable prefetch, concurrency, auto-ack, retry strategy, and timeout per message.
  • Dead-letter queues -- failed messages are routed to a dead-letter queue (suffix-based naming). Inspect and requeue dead letters.
  • Message acknowledgement -- explicit Ack, Nack (requeue), and Reject (discard) per message.

Important Runtime Notes

  • The Redis backend uses Redis Streams and can optionally reuse a Redis client from the database extension via DatabaseRedisConnection.
  • Queue stats include publish rate, delivery rate, ack rate, memory usage, and connection count.
  • Health checks delegate to the backend's Ping() method.

Detailed Pages

How is this guide?

On this page