Queue

Features

Queue extension capabilities and backend details

Unified Queue Interface

All backends implement the same Queue interface, making application code backend-agnostic:

type Queue interface {
    Connect(ctx context.Context) error
    Disconnect(ctx context.Context) error
    Ping(ctx context.Context) error
    DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
    DeleteQueue(ctx context.Context, name string) error
    ListQueues(ctx context.Context) ([]string, error)
    GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)
    PurgeQueue(ctx context.Context, name string) error
    Publish(ctx context.Context, queue string, message Message) error
    PublishBatch(ctx context.Context, queue string, messages []Message) error
    PublishDelayed(ctx context.Context, queue string, message Message, delay time.Duration) error
    Consume(ctx context.Context, queue string, handler MessageHandler, opts ConsumeOptions) error
    StopConsuming(ctx context.Context, queue string) error
    Ack(ctx context.Context, messageID string) error
    Nack(ctx context.Context, messageID string, requeue bool) error
    Reject(ctx context.Context, messageID string) error
    GetDeadLetterQueue(ctx context.Context, queue string) ([]Message, error)
    RequeueDeadLetter(ctx context.Context, queue string, messageID string) error
    Stats(ctx context.Context) (*QueueStats, error)
}

Multiple Backends

DriverImplementationTransportStatus
inmemoryInMemoryQueueIn-process channelsFully implemented
redisRedisQueueRedis StreamsFully implemented
rabbitmqRabbitMQQueueAMQP 0-9-1Fully implemented
natsNATSQueueNATS JetStreamFully implemented

Queue Declaration

Create queues with rich options controlling durability, retention, and priority:

q.DeclareQueue(ctx, "tasks", queue.QueueOptions{
    Durable:         true,              // survive broker restart
    AutoDelete:      false,             // persist when unused
    Exclusive:       false,             // allow multiple consumers
    MaxLength:       100000,            // max messages in queue
    MaxLengthBytes:  1073741824,        // 1 GB max size
    MessageTTL:      48 * time.Hour,    // message expiration
    MaxPriority:     10,                // enable priority levels
    DeadLetterQueue: "tasks.dlq",       // dead-letter destination
    Arguments:       map[string]any{},  // backend-specific arguments
})

Publishing

Send single messages, batches, or delayed messages with headers, priority, and expiration:

// Single message
q.Publish(ctx, "tasks", queue.Message{
    Body:       []byte(`{"task":"process-report"}`),
    Headers:    map[string]string{"type": "report", "priority": "high"},
    Priority:   5,
    Expiration: 1 * time.Hour,
    MaxRetries: 3,
})

// Batch publish
q.PublishBatch(ctx, "tasks", []queue.Message{msg1, msg2, msg3})

// Delayed delivery
q.PublishDelayed(ctx, "tasks", msg, 30*time.Minute)

Concurrent Consumers

Consume messages with configurable concurrency, prefetch, and processing timeout. Each consumer runs in its own goroutine pool:

q.Consume(ctx, "tasks", handleTask, queue.ConsumeOptions{
    ConsumerTag:   "worker-1",
    AutoAck:       false,          // manual acknowledgement
    PrefetchCount: 10,             // buffer 10 messages per worker
    Concurrency:   5,              // 5 concurrent goroutines
    Timeout:       30 * time.Second,
    RetryStrategy: queue.RetryStrategy{
        MaxRetries:      3,
        InitialInterval: 500 * time.Millisecond,
        MaxInterval:     30 * time.Second,
        Multiplier:      2.0,
    },
})

Message Acknowledgement

Explicit control over message lifecycle:

func handleTask(ctx context.Context, msg queue.Message) error {
    err := processTask(msg)
    if err == nil {
        return q.Ack(ctx, msg.ID)        // success: remove from queue
    }
    if isRetryable(err) {
        return q.Nack(ctx, msg.ID, true)  // requeue for retry
    }
    return q.Reject(ctx, msg.ID)          // discard permanently
}

Dead-Letter Queues

When enabled (EnableDeadLetter), messages that exceed max retries are routed to a dead-letter queue (named with DeadLetterSuffix, default .dlq):

// Inspect dead letters
deadLetters, _ := q.GetDeadLetterQueue(ctx, "tasks")
for _, msg := range deadLetters {
    fmt.Printf("Dead letter: %s, retries: %d\n", msg.ID, msg.Retries)
}

// Requeue a specific dead letter
q.RequeueDeadLetter(ctx, "tasks", deadLetters[0].ID)

Queue Management

// List all queues
queues, _ := q.ListQueues(ctx)

// Get detailed queue info
info, _ := q.GetQueueInfo(ctx, "tasks")
fmt.Printf("Messages: %d, Consumers: %d, Publish rate: %.1f/s\n",
    info.Messages, info.Consumers, info.PublishRate)

// Purge all messages
q.PurgeQueue(ctx, "tasks")

// Delete queue
q.DeleteQueue(ctx, "tasks")

// Global stats
stats, _ := q.Stats(ctx)
fmt.Printf("Total queues: %d, Total messages: %d\n",
    stats.QueueCount, stats.TotalMessages)

Redis Client Reuse

The Redis backend can reuse an existing Redis connection from the database extension to avoid duplicate connections:

queue.NewExtension(
    queue.WithDriver("redis"),
    queue.WithDatabaseRedisConnection(true),
)

When DatabaseRedisConnection is true, the queue extension declares a dependency on the database extension and uses its Redis client.

Backend Comparison

FeatureIn-MemoryRedisRabbitMQNATS
PersistenceNoYes (Streams)YesYes (JetStream)
Delayed messagesYesYesYes (plugin)Yes
Priority queuesYesLimitedYesNo
Dead-letterYesYesYes (native)Yes
ClusteringNoYesYesYes
ConcurrencyGoroutinesGoroutinesChannelsSubscriptions

Security and Resilience

  • TLS -- connect to backends over TLS with custom certificates (EnableTLS, TLSCertFile, TLSKeyFile, TLSCAFile).
  • Connection pooling -- MaxConnections and MaxIdleConnections for backend connections.
  • Retry with backoff -- MaxRetries with RetryBackoff and exponential RetryMultiplier.
  • Observability -- optional EnableMetrics and EnableTracing integration.

Sentinel Errors

ErrorMeaning
ErrNotConnectedBackend is not connected
ErrAlreadyConnectedAttempted to connect when already connected
ErrConnectionFailedFailed to establish connection
ErrQueueNotFoundNamed queue does not exist
ErrQueueAlreadyExistsQueue name collision
ErrQueueFullQueue has reached MaxLength
ErrMessageNotFoundMessage ID not found
ErrInvalidMessageMessage failed validation
ErrMessageTooLargeMessage exceeds MaxMessageSize
ErrPublishFailedPublish operation failed
ErrConsumeFailedConsumer setup failed
ErrConsumerNotFoundConsumer not found for queue
ErrAckFailedAcknowledgement failed
ErrNackFailedNegative acknowledgement failed
ErrTimeoutOperation timed out
ErrUnsupportedDriverConfigured driver not recognized
ErrInvalidConfigInvalid configuration values

How is this guide?

On this page