Queue

Functions

Public API reference for the Queue extension

Extension Entry Points

FunctionDescription
NewExtension(opts ...ConfigOption) forge.ExtensionCreate the queue extension with functional options
NewExtensionWithConfig(config Config) forge.ExtensionCreate the queue extension with a complete config
DefaultConfig() ConfigReturns the default configuration

DI Helpers

FunctionDescription
GetQueue(c forge.Container) (Queue, error)Resolve the Queue from the DI container
MustGetQueue(c forge.Container) QueueSame but panics on error
GetQueueFromApp(app forge.App) (Queue, error)Resolve the Queue from an app instance
MustGetQueueFromApp(app forge.App) QueueSame but panics on error

Queue Interface -- Queue Management

MethodDescription
DeclareQueue(ctx, name, opts) errorCreate a queue with the given options
DeleteQueue(ctx, name) errorDelete a queue and all its messages
ListQueues(ctx) ([]string, error)List all queue names
GetQueueInfo(ctx, name) (*QueueInfo, error)Get queue metadata (messages, consumers, rates)
PurgeQueue(ctx, name) errorRemove all messages from a queue

Queue Interface -- Publishing

MethodDescription
Publish(ctx, queue, msg) errorPublish a single message to a queue
PublishBatch(ctx, queue, msgs) errorPublish multiple messages in one operation
PublishDelayed(ctx, queue, msg, delay) errorPublish a message for future delivery

Queue Interface -- Consuming

MethodDescription
Consume(ctx, queue, handler, opts) errorStart consuming messages from a queue
StopConsuming(ctx, queue) errorStop consuming from a queue

Queue Interface -- Message Operations

MethodDescription
Ack(ctx, msg) errorAcknowledge successful processing
Nack(ctx, msg) errorNegative acknowledge (requeue for retry)
Reject(ctx, msg) errorReject and discard permanently

Queue Interface -- Dead Letter

MethodDescription
GetDeadLetterQueue(ctx, queue) ([]Message, error)Get messages in the dead-letter queue
RequeueDeadLetter(ctx, queue, msgID) errorMove a dead letter back to the original queue

Queue Interface -- Stats

MethodDescription
Stats(ctx) (*QueueStats, error)Get backend statistics

Key Types

Message

FieldTypePurpose
IDstringUnique message identifier
QueuestringQueue name
Body[]byteMessage payload
Headersmap[string]stringMessage headers
PriorityintMessage priority (higher = more urgent)
Delaytime.DurationDelivery delay
Expirationtime.DurationMessage TTL
RetriesintCurrent retry count
MaxRetriesintMaximum retry attempts

QueueOptions

FieldTypeDefaultPurpose
DurablebooltrueSurvive broker restart
AutoDeleteboolfalseDelete when no consumers
ExclusiveboolfalseSingle consumer only
MaxLengthint640Max queue depth (0 = unlimited)
MessageTTLtime.Duration0Default message TTL
MaxPriorityint0Max priority level (0 = disabled)
DeadLetterQueuestring""Custom DLQ name

ConsumeOptions

FieldTypeDefaultPurpose
AutoAckboolfalseAuto-acknowledge on delivery
PrefetchCountint10Messages to prefetch
Concurrencyint1Concurrent message handlers
Timeouttime.Duration30sPer-message processing timeout
RetryStrategyRetryStrategy3 retries, 2x backoffRetry configuration

How is this guide?

On this page