Features
Queue extension capabilities and backend details
Unified Queue Interface
All backends implement the same Queue interface, making application code backend-agnostic:
type Queue interface {
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
Ping(ctx context.Context) error
DeclareQueue(ctx context.Context, name string, opts QueueOptions) error
DeleteQueue(ctx context.Context, name string) error
ListQueues(ctx context.Context) ([]string, error)
GetQueueInfo(ctx context.Context, name string) (*QueueInfo, error)
PurgeQueue(ctx context.Context, name string) error
Publish(ctx context.Context, queue string, message Message) error
PublishBatch(ctx context.Context, queue string, messages []Message) error
PublishDelayed(ctx context.Context, queue string, message Message, delay time.Duration) error
Consume(ctx context.Context, queue string, handler MessageHandler, opts ConsumeOptions) error
StopConsuming(ctx context.Context, queue string) error
Ack(ctx context.Context, messageID string) error
Nack(ctx context.Context, messageID string, requeue bool) error
Reject(ctx context.Context, messageID string) error
GetDeadLetterQueue(ctx context.Context, queue string) ([]Message, error)
RequeueDeadLetter(ctx context.Context, queue string, messageID string) error
Stats(ctx context.Context) (*QueueStats, error)
}Multiple Backends
| Driver | Implementation | Transport | Status |
|---|---|---|---|
inmemory | InMemoryQueue | In-process channels | Fully implemented |
redis | RedisQueue | Redis Streams | Fully implemented |
rabbitmq | RabbitMQQueue | AMQP 0-9-1 | Fully implemented |
nats | NATSQueue | NATS JetStream | Fully implemented |
Queue Declaration
Create queues with rich options controlling durability, retention, and priority:
q.DeclareQueue(ctx, "tasks", queue.QueueOptions{
Durable: true, // survive broker restart
AutoDelete: false, // persist when unused
Exclusive: false, // allow multiple consumers
MaxLength: 100000, // max messages in queue
MaxLengthBytes: 1073741824, // 1 GB max size
MessageTTL: 48 * time.Hour, // message expiration
MaxPriority: 10, // enable priority levels
DeadLetterQueue: "tasks.dlq", // dead-letter destination
Arguments: map[string]any{}, // backend-specific arguments
})Publishing
Send single messages, batches, or delayed messages with headers, priority, and expiration:
// Single message
q.Publish(ctx, "tasks", queue.Message{
Body: []byte(`{"task":"process-report"}`),
Headers: map[string]string{"type": "report", "priority": "high"},
Priority: 5,
Expiration: 1 * time.Hour,
MaxRetries: 3,
})
// Batch publish
q.PublishBatch(ctx, "tasks", []queue.Message{msg1, msg2, msg3})
// Delayed delivery
q.PublishDelayed(ctx, "tasks", msg, 30*time.Minute)Concurrent Consumers
Consume messages with configurable concurrency, prefetch, and processing timeout. Each consumer runs in its own goroutine pool:
q.Consume(ctx, "tasks", handleTask, queue.ConsumeOptions{
ConsumerTag: "worker-1",
AutoAck: false, // manual acknowledgement
PrefetchCount: 10, // buffer 10 messages per worker
Concurrency: 5, // 5 concurrent goroutines
Timeout: 30 * time.Second,
RetryStrategy: queue.RetryStrategy{
MaxRetries: 3,
InitialInterval: 500 * time.Millisecond,
MaxInterval: 30 * time.Second,
Multiplier: 2.0,
},
})Message Acknowledgement
Explicit control over message lifecycle:
func handleTask(ctx context.Context, msg queue.Message) error {
err := processTask(msg)
if err == nil {
return q.Ack(ctx, msg.ID) // success: remove from queue
}
if isRetryable(err) {
return q.Nack(ctx, msg.ID, true) // requeue for retry
}
return q.Reject(ctx, msg.ID) // discard permanently
}Dead-Letter Queues
When enabled (EnableDeadLetter), messages that exceed max retries are routed to a dead-letter queue (named with DeadLetterSuffix, default .dlq):
// Inspect dead letters
deadLetters, _ := q.GetDeadLetterQueue(ctx, "tasks")
for _, msg := range deadLetters {
fmt.Printf("Dead letter: %s, retries: %d\n", msg.ID, msg.Retries)
}
// Requeue a specific dead letter
q.RequeueDeadLetter(ctx, "tasks", deadLetters[0].ID)Queue Management
// List all queues
queues, _ := q.ListQueues(ctx)
// Get detailed queue info
info, _ := q.GetQueueInfo(ctx, "tasks")
fmt.Printf("Messages: %d, Consumers: %d, Publish rate: %.1f/s\n",
info.Messages, info.Consumers, info.PublishRate)
// Purge all messages
q.PurgeQueue(ctx, "tasks")
// Delete queue
q.DeleteQueue(ctx, "tasks")
// Global stats
stats, _ := q.Stats(ctx)
fmt.Printf("Total queues: %d, Total messages: %d\n",
stats.QueueCount, stats.TotalMessages)Redis Client Reuse
The Redis backend can reuse an existing Redis connection from the database extension to avoid duplicate connections:
queue.NewExtension(
queue.WithDriver("redis"),
queue.WithDatabaseRedisConnection(true),
)When DatabaseRedisConnection is true, the queue extension declares a dependency on the database extension and uses its Redis client.
Backend Comparison
| Feature | In-Memory | Redis | RabbitMQ | NATS |
|---|---|---|---|---|
| Persistence | No | Yes (Streams) | Yes | Yes (JetStream) |
| Delayed messages | Yes | Yes | Yes (plugin) | Yes |
| Priority queues | Yes | Limited | Yes | No |
| Dead-letter | Yes | Yes | Yes (native) | Yes |
| Clustering | No | Yes | Yes | Yes |
| Concurrency | Goroutines | Goroutines | Channels | Subscriptions |
Security and Resilience
- TLS -- connect to backends over TLS with custom certificates (
EnableTLS,TLSCertFile,TLSKeyFile,TLSCAFile). - Connection pooling --
MaxConnectionsandMaxIdleConnectionsfor backend connections. - Retry with backoff --
MaxRetrieswithRetryBackoffand exponentialRetryMultiplier. - Observability -- optional
EnableMetricsandEnableTracingintegration.
Sentinel Errors
| Error | Meaning |
|---|---|
ErrNotConnected | Backend is not connected |
ErrAlreadyConnected | Attempted to connect when already connected |
ErrConnectionFailed | Failed to establish connection |
ErrQueueNotFound | Named queue does not exist |
ErrQueueAlreadyExists | Queue name collision |
ErrQueueFull | Queue has reached MaxLength |
ErrMessageNotFound | Message ID not found |
ErrInvalidMessage | Message failed validation |
ErrMessageTooLarge | Message exceeds MaxMessageSize |
ErrPublishFailed | Publish operation failed |
ErrConsumeFailed | Consumer setup failed |
ErrConsumerNotFound | Consumer not found for queue |
ErrAckFailed | Acknowledgement failed |
ErrNackFailed | Negative acknowledgement failed |
ErrTimeout | Operation timed out |
ErrUnsupportedDriver | Configured driver not recognized |
ErrInvalidConfig | Invalid configuration values |
How is this guide?