Queue
Functions
Public API reference for the Queue extension
| Function | Description |
|---|
NewExtension(opts ...ConfigOption) forge.Extension | Create the queue extension with functional options |
NewExtensionWithConfig(config Config) forge.Extension | Create the queue extension with a complete config |
DefaultConfig() Config | Returns the default configuration |
| Function | Description |
|---|
GetQueue(c forge.Container) (Queue, error) | Resolve the Queue from the DI container |
MustGetQueue(c forge.Container) Queue | Same but panics on error |
GetQueueFromApp(app forge.App) (Queue, error) | Resolve the Queue from an app instance |
MustGetQueueFromApp(app forge.App) Queue | Same but panics on error |
| Method | Description |
|---|
DeclareQueue(ctx, name, opts) error | Create a queue with the given options |
DeleteQueue(ctx, name) error | Delete a queue and all its messages |
ListQueues(ctx) ([]string, error) | List all queue names |
GetQueueInfo(ctx, name) (*QueueInfo, error) | Get queue metadata (messages, consumers, rates) |
PurgeQueue(ctx, name) error | Remove all messages from a queue |
| Method | Description |
|---|
Publish(ctx, queue, msg) error | Publish a single message to a queue |
PublishBatch(ctx, queue, msgs) error | Publish multiple messages in one operation |
PublishDelayed(ctx, queue, msg, delay) error | Publish a message for future delivery |
| Method | Description |
|---|
Consume(ctx, queue, handler, opts) error | Start consuming messages from a queue |
StopConsuming(ctx, queue) error | Stop consuming from a queue |
| Method | Description |
|---|
Ack(ctx, msg) error | Acknowledge successful processing |
Nack(ctx, msg) error | Negative acknowledge (requeue for retry) |
Reject(ctx, msg) error | Reject and discard permanently |
| Method | Description |
|---|
GetDeadLetterQueue(ctx, queue) ([]Message, error) | Get messages in the dead-letter queue |
RequeueDeadLetter(ctx, queue, msgID) error | Move a dead letter back to the original queue |
| Method | Description |
|---|
Stats(ctx) (*QueueStats, error) | Get backend statistics |
| Field | Type | Purpose |
|---|
ID | string | Unique message identifier |
Queue | string | Queue name |
Body | []byte | Message payload |
Headers | map[string]string | Message headers |
Priority | int | Message priority (higher = more urgent) |
Delay | time.Duration | Delivery delay |
Expiration | time.Duration | Message TTL |
Retries | int | Current retry count |
MaxRetries | int | Maximum retry attempts |
| Field | Type | Default | Purpose |
|---|
Durable | bool | true | Survive broker restart |
AutoDelete | bool | false | Delete when no consumers |
Exclusive | bool | false | Single consumer only |
MaxLength | int64 | 0 | Max queue depth (0 = unlimited) |
MessageTTL | time.Duration | 0 | Default message TTL |
MaxPriority | int | 0 | Max priority level (0 = disabled) |
DeadLetterQueue | string | "" | Custom DLQ name |
| Field | Type | Default | Purpose |
|---|
AutoAck | bool | false | Auto-acknowledge on delivery |
PrefetchCount | int | 10 | Messages to prefetch |
Concurrency | int | 1 | Concurrent message handlers |
Timeout | time.Duration | 30s | Per-message processing timeout |
RetryStrategy | RetryStrategy | 3 retries, 2x backoff | Retry configuration |
How is this guide?