Queue Extension
Message queue system with support for multiple backends and advanced messaging patterns
Queue Extension
The Queue extension provides a unified interface for message queue operations with support for multiple backends including Redis, RabbitMQ, Apache Kafka, NATS, and in-memory queues. It offers reliable message delivery, dead letter queues, batch processing, and advanced messaging patterns.
The Queue extension abstracts message queue complexity while providing access to backend-specific features for advanced use cases.
Features
Multiple Queue Backends
- Redis: Simple pub/sub and list-based queues
- RabbitMQ: Advanced AMQP messaging with routing
- Apache Kafka: High-throughput distributed streaming
- NATS: Lightweight cloud-native messaging
- Amazon SQS: Managed cloud queue service
- In-Memory: Fast local queues for development
Core Capabilities
- Reliable Delivery: At-least-once and exactly-once delivery
- Dead Letter Queues: Handle failed message processing
- Delayed Messages: Schedule messages for future delivery
- Batch Processing: Publish and consume messages in batches
- Message Acknowledgment: Manual and automatic acknowledgment
- Queue Management: Create, delete, and monitor queues
Advanced Features
- Priority Queues: Process high-priority messages first
- Message Routing: Route messages based on patterns
- Consumer Groups: Load balance across multiple consumers
- Message Filtering: Filter messages by headers or content
- Retry Strategies: Configurable retry with backoff
- Monitoring: Real-time queue metrics and health checks
Installation
go get github.com/xraph/forge/extensions/queueConfiguration
extensions:
queue:
# Default queue backend
default: "redis"
# Queue backend configurations
backends:
redis:
driver: "redis"
url: "redis://localhost:6379/0"
pool_size: 10
min_idle_conns: 5
max_retries: 3
dial_timeout: "5s"
read_timeout: "3s"
write_timeout: "3s"
rabbitmq:
driver: "rabbitmq"
url: "amqp://guest:guest@localhost:5672/"
max_channels: 100
heartbeat: "60s"
connection_timeout: "30s"
prefetch_count: 10
prefetch_size: 0
kafka:
driver: "kafka"
brokers: ["localhost:9092"]
client_id: "forge-queue"
version: "2.8.0"
security:
mechanism: "PLAIN"
username: ""
password: ""
producer:
max_message_bytes: 1000000
required_acks: 1
timeout: "10s"
compression: "snappy"
flush_frequency: "100ms"
flush_messages: 100
consumer:
group_id: "forge-consumers"
session_timeout: "10s"
heartbeat_interval: "3s"
rebalance_timeout: "60s"
nats:
driver: "nats"
url: "nats://localhost:4222"
max_reconnects: 10
reconnect_wait: "2s"
timeout: "5s"
ping_interval: "2m"
max_pings_out: 2
sqs:
driver: "sqs"
region: "us-west-2"
access_key_id: "${AWS_ACCESS_KEY_ID}"
secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
endpoint: "" # Optional for LocalStack
memory:
driver: "memory"
max_queues: 100
max_messages_per_queue: 10000
cleanup_interval: "1m"
# Global queue settings
settings:
default_visibility_timeout: "30s"
default_message_retention: "14d"
default_max_receives: 3
dead_letter_queue_suffix: "_dlq"
monitoring:
enabled: true
metrics_interval: "30s"
health_check_interval: "1m"
retry:
max_attempts: 3
initial_interval: "1s"
max_interval: "30s"
multiplier: 2.0# Default Backend
export QUEUE_DEFAULT="redis"
# Redis Configuration
export QUEUE_REDIS_URL="redis://localhost:6379/0"
export QUEUE_REDIS_POOL_SIZE="10"
export QUEUE_REDIS_MAX_RETRIES="3"
# RabbitMQ Configuration
export QUEUE_RABBITMQ_URL="amqp://guest:guest@localhost:5672/"
export QUEUE_RABBITMQ_MAX_CHANNELS="100"
export QUEUE_RABBITMQ_PREFETCH_COUNT="10"
# Kafka Configuration
export QUEUE_KAFKA_BROKERS="localhost:9092"
export QUEUE_KAFKA_CLIENT_ID="forge-queue"
export QUEUE_KAFKA_CONSUMER_GROUP="forge-consumers"
# NATS Configuration
export QUEUE_NATS_URL="nats://localhost:4222"
export QUEUE_NATS_MAX_RECONNECTS="10"
# SQS Configuration
export QUEUE_SQS_REGION="us-west-2"
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
# Global Settings
export QUEUE_DEFAULT_VISIBILITY_TIMEOUT="30s"
export QUEUE_MONITORING_ENABLED="true"package main
import (
"time"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/queue"
)
func main() {
app := forge.New()
// Configure Queue extension
queueExt := queue.New(queue.Config{
Default: "redis",
Backends: map[string]queue.BackendConfig{
"redis": {
Driver: "redis",
URL: "redis://localhost:6379/0",
PoolSize: 10,
MinIdleConns: 5,
MaxRetries: 3,
DialTimeout: time.Second * 5,
ReadTimeout: time.Second * 3,
WriteTimeout: time.Second * 3,
},
"rabbitmq": {
Driver: "rabbitmq",
URL: "amqp://guest:guest@localhost:5672/",
MaxChannels: 100,
Heartbeat: time.Second * 60,
ConnectionTimeout: time.Second * 30,
PrefetchCount: 10,
},
"kafka": {
Driver: "kafka",
Brokers: []string{"localhost:9092"},
ClientID: "forge-queue",
Version: "2.8.0",
Producer: queue.KafkaProducerConfig{
MaxMessageBytes: 1000000,
RequiredAcks: 1,
Timeout: time.Second * 10,
Compression: "snappy",
FlushFrequency: time.Millisecond * 100,
FlushMessages: 100,
},
Consumer: queue.KafkaConsumerConfig{
GroupID: "forge-consumers",
SessionTimeout: time.Second * 10,
HeartbeatInterval: time.Second * 3,
RebalanceTimeout: time.Second * 60,
},
},
},
Settings: queue.Settings{
DefaultVisibilityTimeout: time.Second * 30,
DefaultMessageRetention: time.Hour * 24 * 14,
DefaultMaxReceives: 3,
DeadLetterQueueSuffix: "_dlq",
Monitoring: queue.MonitoringConfig{
Enabled: true,
MetricsInterval: time.Second * 30,
HealthCheckInterval: time.Minute,
},
Retry: queue.RetryConfig{
MaxAttempts: 3,
InitialInterval: time.Second,
MaxInterval: time.Second * 30,
Multiplier: 2.0,
},
},
})
app.RegisterExtension(queueExt)
app.Run()
}Usage Examples
Basic Queue Operations
type OrderEvent struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Amount float64 `json:"amount"`
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
}
func publishOrderHandler(c forge.Context) error {
queue := forge.GetQueue(c)
var order OrderEvent
if err := c.Bind(&order); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
order.Timestamp = time.Now()
// Publish message to order processing queue
message := queue.Message{
ID: uuid.New().String(),
Body: order,
Headers: map[string]string{
"event_type": "order_created",
"customer_id": order.CustomerID,
"priority": "normal",
},
}
err := queue.Publish(c.Context(), "order_processing", message)
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to publish order event",
"details": err.Error(),
})
}
return c.JSON(200, map[string]interface{}{
"message_id": message.ID,
"queue": "order_processing",
"status": "published",
})
}
func publishNotificationHandler(c forge.Context) error {
queue := forge.GetQueue(c)
var notification struct {
UserID string `json:"user_id"`
Type string `json:"type"`
Title string `json:"title"`
Message string `json:"message"`
Data map[string]interface{} `json:"data,omitempty"`
}
if err := c.Bind(¬ification); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
// Publish with priority based on notification type
priority := "normal"
if notification.Type == "urgent" || notification.Type == "security" {
priority = "high"
}
message := queue.Message{
ID: uuid.New().String(),
Body: notification,
Headers: map[string]string{
"notification_type": notification.Type,
"user_id": notification.UserID,
"priority": priority,
},
Priority: getPriorityLevel(priority),
}
err := queue.Publish(c.Context(), "notifications", message)
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to publish notification",
"details": err.Error(),
})
}
return c.JSON(200, map[string]interface{}{
"message_id": message.ID,
"queue": "notifications",
"priority": priority,
"status": "published",
})
}
func publishEmailHandler(c forge.Context) error {
queue := forge.GetQueue(c)
var email struct {
To []string `json:"to"`
CC []string `json:"cc,omitempty"`
BCC []string `json:"bcc,omitempty"`
Subject string `json:"subject"`
Body string `json:"body"`
Template string `json:"template,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
}
if err := c.Bind(&email); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
// Validate email
if len(email.To) == 0 {
return c.JSON(400, map[string]string{"error": "At least one recipient is required"})
}
if email.Subject == "" {
return c.JSON(400, map[string]string{"error": "Subject is required"})
}
message := queue.Message{
ID: uuid.New().String(),
Body: email,
Headers: map[string]string{
"email_type": getEmailType(email.Template),
"recipient_count": fmt.Sprintf("%d", len(email.To)),
},
}
err := queue.Publish(c.Context(), "email_queue", message)
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to queue email",
"details": err.Error(),
})
}
return c.JSON(200, map[string]interface{}{
"message_id": message.ID,
"queue": "email_queue",
"recipients": len(email.To),
"status": "queued",
})
}
func getPriorityLevel(priority string) int {
switch priority {
case "high":
return 10
case "normal":
return 5
case "low":
return 1
default:
return 5
}
}
func getEmailType(template string) string {
if template == "" {
return "plain"
}
return "template"
}func startOrderProcessor(app *forge.App) {
queue := app.GetQueue()
// Configure consumer options
options := queue.ConsumeOptions{
QueueName: "order_processing",
ConsumerGroup: "order_processors",
ConcurrentWorkers: 5,
PrefetchCount: 10,
VisibilityTimeout: time.Second * 30,
MaxRetries: 3,
RetryStrategy: queue.RetryStrategy{
Type: "exponential",
InitialInterval: time.Second,
MaxInterval: time.Second * 30,
Multiplier: 2.0,
},
}
// Start consuming messages
err := queue.Consume(context.Background(), options, processOrderMessage)
if err != nil {
log.Fatalf("Failed to start order processor: %v", err)
}
}
func processOrderMessage(ctx context.Context, message queue.Message) error {
var order OrderEvent
if err := json.Unmarshal(message.Body, &order); err != nil {
log.Printf("Failed to unmarshal order: %v", err)
return queue.ErrPermanentFailure // Don't retry
}
log.Printf("Processing order %s for customer %s", order.OrderID, order.CustomerID)
// Validate order
if err := validateOrder(order); err != nil {
log.Printf("Invalid order %s: %v", order.OrderID, err)
return queue.ErrPermanentFailure
}
// Process payment
if err := processPayment(ctx, order); err != nil {
if isTemporaryError(err) {
log.Printf("Temporary payment error for order %s: %v", order.OrderID, err)
return err // Will retry
}
log.Printf("Permanent payment error for order %s: %v", order.OrderID, err)
return queue.ErrPermanentFailure
}
// Update inventory
if err := updateInventory(ctx, order); err != nil {
log.Printf("Failed to update inventory for order %s: %v", order.OrderID, err)
return err // Will retry
}
// Send confirmation
if err := sendOrderConfirmation(ctx, order); err != nil {
log.Printf("Failed to send confirmation for order %s: %v", order.OrderID, err)
// Don't fail the entire process for notification errors
}
log.Printf("Successfully processed order %s", order.OrderID)
return nil
}
func startNotificationProcessor(app *forge.App) {
queue := app.GetQueue()
options := queue.ConsumeOptions{
QueueName: "notifications",
ConsumerGroup: "notification_processors",
ConcurrentWorkers: 10,
PrefetchCount: 20,
VisibilityTimeout: time.Second * 15,
MaxRetries: 2,
}
err := queue.Consume(context.Background(), options, processNotificationMessage)
if err != nil {
log.Fatalf("Failed to start notification processor: %v", err)
}
}
func processNotificationMessage(ctx context.Context, message queue.Message) error {
var notification struct {
UserID string `json:"user_id"`
Type string `json:"type"`
Title string `json:"title"`
Message string `json:"message"`
Data map[string]interface{} `json:"data,omitempty"`
}
if err := json.Unmarshal(message.Body, ¬ification); err != nil {
return queue.ErrPermanentFailure
}
log.Printf("Processing %s notification for user %s",
notification.Type, notification.UserID)
// Get user preferences
preferences, err := getUserNotificationPreferences(ctx, notification.UserID)
if err != nil {
log.Printf("Failed to get preferences for user %s: %v",
notification.UserID, err)
return err
}
// Check if user wants this type of notification
if !preferences.IsEnabled(notification.Type) {
log.Printf("User %s has disabled %s notifications",
notification.UserID, notification.Type)
return nil // Skip without error
}
// Send via preferred channels
var errors []error
if preferences.Email {
if err := sendEmailNotification(ctx, notification); err != nil {
errors = append(errors, fmt.Errorf("email: %w", err))
}
}
if preferences.Push {
if err := sendPushNotification(ctx, notification); err != nil {
errors = append(errors, fmt.Errorf("push: %w", err))
}
}
if preferences.SMS && notification.Type == "urgent" {
if err := sendSMSNotification(ctx, notification); err != nil {
errors = append(errors, fmt.Errorf("sms: %w", err))
}
}
if len(errors) > 0 {
log.Printf("Some notification channels failed for user %s: %v",
notification.UserID, errors)
// Return error to retry if any channel failed
return fmt.Errorf("notification delivery failed: %v", errors)
}
log.Printf("Successfully sent %s notification to user %s",
notification.Type, notification.UserID)
return nil
}
func startEmailProcessor(app *forge.App) {
queue := app.GetQueue()
options := queue.ConsumeOptions{
QueueName: "email_queue",
ConsumerGroup: "email_processors",
ConcurrentWorkers: 3, // Limit to avoid overwhelming SMTP server
PrefetchCount: 5,
VisibilityTimeout: time.Minute * 2,
MaxRetries: 3,
RetryStrategy: queue.RetryStrategy{
Type: "exponential",
InitialInterval: time.Second * 5,
MaxInterval: time.Minute * 5,
Multiplier: 2.0,
},
}
err := queue.Consume(context.Background(), options, processEmailMessage)
if err != nil {
log.Fatalf("Failed to start email processor: %v", err)
}
}
func processEmailMessage(ctx context.Context, message queue.Message) error {
var email struct {
To []string `json:"to"`
CC []string `json:"cc,omitempty"`
BCC []string `json:"bcc,omitempty"`
Subject string `json:"subject"`
Body string `json:"body"`
Template string `json:"template,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
}
if err := json.Unmarshal(message.Body, &email); err != nil {
return queue.ErrPermanentFailure
}
log.Printf("Processing email: %s to %d recipients",
email.Subject, len(email.To))
// Render template if specified
if email.Template != "" {
renderedBody, err := renderEmailTemplate(email.Template, email.Data)
if err != nil {
log.Printf("Failed to render email template %s: %v",
email.Template, err)
return queue.ErrPermanentFailure
}
email.Body = renderedBody
}
// Send email
if err := sendEmail(ctx, email); err != nil {
if isTemporaryEmailError(err) {
log.Printf("Temporary email error: %v", err)
return err // Will retry
}
log.Printf("Permanent email error: %v", err)
return queue.ErrPermanentFailure
}
log.Printf("Successfully sent email: %s", email.Subject)
return nil
}func publishBatchOrdersHandler(c forge.Context) error {
queue := forge.GetQueue(c)
var orders []OrderEvent
if err := c.Bind(&orders); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
if len(orders) == 0 {
return c.JSON(400, map[string]string{"error": "No orders provided"})
}
if len(orders) > 100 {
return c.JSON(400, map[string]string{"error": "Maximum 100 orders per batch"})
}
// Prepare batch messages
var messages []queue.Message
for _, order := range orders {
order.Timestamp = time.Now()
message := queue.Message{
ID: uuid.New().String(),
Body: order,
Headers: map[string]string{
"event_type": "order_created",
"customer_id": order.CustomerID,
"batch_id": c.Get("batch_id").(string),
},
}
messages = append(messages, message)
}
// Publish batch
results, err := queue.PublishBatch(c.Context(), "order_processing", messages)
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to publish batch",
"details": err.Error(),
})
}
// Analyze results
var successful, failed int
var failedMessages []string
for i, result := range results {
if result.Error != nil {
failed++
failedMessages = append(failedMessages,
fmt.Sprintf("Message %d: %v", i, result.Error))
} else {
successful++
}
}
response := map[string]interface{}{
"total": len(orders),
"successful": successful,
"failed": failed,
"queue": "order_processing",
}
if failed > 0 {
response["errors"] = failedMessages
}
statusCode := 200
if failed > 0 && successful == 0 {
statusCode = 500
} else if failed > 0 {
statusCode = 207 // Partial success
}
return c.JSON(statusCode, response)
}
func batchConsumeOrdersHandler(c forge.Context) error {
queue := forge.GetQueue(c)
batchSize := c.QueryInt("batch_size", 10)
if batchSize > 50 {
batchSize = 50
}
timeout := c.QueryDuration("timeout", time.Second*30)
// Consume batch of messages
messages, err := queue.ConsumeBatch(c.Context(), "order_processing",
queue.BatchConsumeOptions{
BatchSize: batchSize,
Timeout: timeout,
VisibilityTimeout: time.Minute,
})
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to consume batch",
"details": err.Error(),
})
}
if len(messages) == 0 {
return c.JSON(200, map[string]interface{}{
"messages": []interface{}{},
"count": 0,
"message": "No messages available",
})
}
// Process messages
var processed []map[string]interface{}
var errors []string
for _, message := range messages {
var order OrderEvent
if err := json.Unmarshal(message.Body, &order); err != nil {
errors = append(errors, fmt.Sprintf("Message %s: invalid format", message.ID))
queue.Nack(c.Context(), message.ID, false) // Don't requeue
continue
}
// Process order (simplified)
if err := processOrderSync(order); err != nil {
errors = append(errors, fmt.Sprintf("Order %s: %v", order.OrderID, err))
queue.Nack(c.Context(), message.ID, true) // Requeue for retry
continue
}
// Acknowledge successful processing
if err := queue.Ack(c.Context(), message.ID); err != nil {
errors = append(errors, fmt.Sprintf("Failed to ack message %s: %v", message.ID, err))
}
processed = append(processed, map[string]interface{}{
"message_id": message.ID,
"order_id": order.OrderID,
"status": "processed",
})
}
response := map[string]interface{}{
"requested": batchSize,
"received": len(messages),
"processed": len(processed),
"failed": len(errors),
"messages": processed,
}
if len(errors) > 0 {
response["errors"] = errors
}
return c.JSON(200, response)
}
func bulkNotificationHandler(c forge.Context) error {
queue := forge.GetQueue(c)
var req struct {
UserIDs []string `json:"user_ids"`
Type string `json:"type"`
Title string `json:"title"`
Message string `json:"message"`
Data map[string]interface{} `json:"data,omitempty"`
}
if err := c.Bind(&req); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
if len(req.UserIDs) == 0 {
return c.JSON(400, map[string]string{"error": "No user IDs provided"})
}
if len(req.UserIDs) > 1000 {
return c.JSON(400, map[string]string{"error": "Maximum 1000 users per bulk notification"})
}
// Create messages for each user
var messages []queue.Message
batchID := uuid.New().String()
for _, userID := range req.UserIDs {
notification := map[string]interface{}{
"user_id": userID,
"type": req.Type,
"title": req.Title,
"message": req.Message,
"data": req.Data,
}
message := queue.Message{
ID: uuid.New().String(),
Body: notification,
Headers: map[string]string{
"notification_type": req.Type,
"user_id": userID,
"batch_id": batchID,
"bulk_notification": "true",
},
}
messages = append(messages, message)
}
// Publish in chunks to avoid overwhelming the queue
chunkSize := 100
var totalSuccessful, totalFailed int
var allErrors []string
for i := 0; i < len(messages); i += chunkSize {
end := i + chunkSize
if end > len(messages) {
end = len(messages)
}
chunk := messages[i:end]
results, err := queue.PublishBatch(c.Context(), "notifications", chunk)
if err != nil {
totalFailed += len(chunk)
allErrors = append(allErrors, fmt.Sprintf("Chunk %d-%d: %v", i, end-1, err))
continue
}
for j, result := range results {
if result.Error != nil {
totalFailed++
allErrors = append(allErrors,
fmt.Sprintf("Message %d: %v", i+j, result.Error))
} else {
totalSuccessful++
}
}
}
response := map[string]interface{}{
"batch_id": batchID,
"total": len(req.UserIDs),
"successful": totalSuccessful,
"failed": totalFailed,
"queue": "notifications",
}
if totalFailed > 0 {
response["errors"] = allErrors
}
statusCode := 200
if totalFailed > 0 && totalSuccessful == 0 {
statusCode = 500
} else if totalFailed > 0 {
statusCode = 207
}
return c.JSON(statusCode, response)
}func scheduleReminderHandler(c forge.Context) error {
queue := forge.GetQueue(c)
var reminder struct {
UserID string `json:"user_id"`
Type string `json:"type"`
Title string `json:"title"`
Message string `json:"message"`
ScheduledAt time.Time `json:"scheduled_at"`
Data map[string]interface{} `json:"data,omitempty"`
}
if err := c.Bind(&reminder); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
// Validate scheduled time
if reminder.ScheduledAt.Before(time.Now()) {
return c.JSON(400, map[string]string{"error": "Scheduled time must be in the future"})
}
// Calculate delay
delay := time.Until(reminder.ScheduledAt)
if delay > time.Hour*24*30 { // 30 days max
return c.JSON(400, map[string]string{"error": "Maximum delay is 30 days"})
}
message := queue.Message{
ID: uuid.New().String(),
Body: reminder,
Headers: map[string]string{
"reminder_type": reminder.Type,
"user_id": reminder.UserID,
"scheduled_at": reminder.ScheduledAt.Format(time.RFC3339),
},
DelayUntil: reminder.ScheduledAt,
}
err := queue.PublishDelayed(c.Context(), "reminders", message, delay)
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to schedule reminder",
"details": err.Error(),
})
}
return c.JSON(201, map[string]interface{}{
"message_id": message.ID,
"scheduled_at": reminder.ScheduledAt,
"delay_seconds": int(delay.Seconds()),
"queue": "reminders",
"status": "scheduled",
})
}
func scheduleReportHandler(c forge.Context) error {
queue := forge.GetQueue(c)
var report struct {
ReportType string `json:"report_type"`
UserID string `json:"user_id"`
Frequency string `json:"frequency"` // daily, weekly, monthly
Time string `json:"time"` // HH:MM format
Timezone string `json:"timezone"`
Config map[string]interface{} `json:"config,omitempty"`
}
if err := c.Bind(&report); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
// Parse timezone
loc, err := time.LoadLocation(report.Timezone)
if err != nil {
return c.JSON(400, map[string]string{"error": "Invalid timezone"})
}
// Parse time
timeParts := strings.Split(report.Time, ":")
if len(timeParts) != 2 {
return c.JSON(400, map[string]string{"error": "Invalid time format (use HH:MM)"})
}
hour, err := strconv.Atoi(timeParts[0])
if err != nil || hour < 0 || hour > 23 {
return c.JSON(400, map[string]string{"error": "Invalid hour"})
}
minute, err := strconv.Atoi(timeParts[1])
if err != nil || minute < 0 || minute > 59 {
return c.JSON(400, map[string]string{"error": "Invalid minute"})
}
// Calculate next execution time
now := time.Now().In(loc)
nextExecution := time.Date(now.Year(), now.Month(), now.Day(),
hour, minute, 0, 0, loc)
// Adjust based on frequency
switch report.Frequency {
case "daily":
if nextExecution.Before(now) {
nextExecution = nextExecution.Add(24 * time.Hour)
}
case "weekly":
if nextExecution.Before(now) {
nextExecution = nextExecution.Add(7 * 24 * time.Hour)
}
// Adjust to next Monday (or specified day)
for nextExecution.Weekday() != time.Monday {
nextExecution = nextExecution.Add(24 * time.Hour)
}
case "monthly":
if nextExecution.Before(now) {
nextExecution = nextExecution.AddDate(0, 1, 0)
}
// Adjust to first day of month
nextExecution = time.Date(nextExecution.Year(), nextExecution.Month(), 1,
hour, minute, 0, 0, loc)
default:
return c.JSON(400, map[string]string{"error": "Invalid frequency"})
}
delay := time.Until(nextExecution)
message := queue.Message{
ID: uuid.New().String(),
Body: report,
Headers: map[string]string{
"report_type": report.ReportType,
"user_id": report.UserID,
"frequency": report.Frequency,
"next_execution": nextExecution.Format(time.RFC3339),
"timezone": report.Timezone,
},
DelayUntil: nextExecution,
}
err = queue.PublishDelayed(c.Context(), "scheduled_reports", message, delay)
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to schedule report",
"details": err.Error(),
})
}
return c.JSON(201, map[string]interface{}{
"message_id": message.ID,
"next_execution": nextExecution,
"delay_seconds": int(delay.Seconds()),
"frequency": report.Frequency,
"queue": "scheduled_reports",
"status": "scheduled",
})
}
func processScheduledReportMessage(ctx context.Context, message queue.Message) error {
var report struct {
ReportType string `json:"report_type"`
UserID string `json:"user_id"`
Frequency string `json:"frequency"`
Time string `json:"time"`
Timezone string `json:"timezone"`
Config map[string]interface{} `json:"config,omitempty"`
}
if err := json.Unmarshal(message.Body, &report); err != nil {
return queue.ErrPermanentFailure
}
log.Printf("Generating %s report for user %s", report.ReportType, report.UserID)
// Generate report
reportData, err := generateReport(ctx, report.ReportType, report.UserID, report.Config)
if err != nil {
log.Printf("Failed to generate report: %v", err)
return err // Will retry
}
// Send report to user
if err := sendReport(ctx, report.UserID, reportData); err != nil {
log.Printf("Failed to send report: %v", err)
return err // Will retry
}
// Schedule next execution if recurring
if report.Frequency != "once" {
if err := scheduleNextReport(ctx, report); err != nil {
log.Printf("Failed to schedule next report: %v", err)
// Don't fail the current execution for scheduling errors
}
}
log.Printf("Successfully processed %s report for user %s",
report.ReportType, report.UserID)
return nil
}
func cancelScheduledMessageHandler(c forge.Context) error {
queue := forge.GetQueue(c)
messageID := c.Param("message_id")
if messageID == "" {
return c.JSON(400, map[string]string{"error": "Message ID is required"})
}
// Cancel scheduled message
err := queue.CancelDelayed(c.Context(), messageID)
if err != nil {
if errors.Is(err, queue.ErrMessageNotFound) {
return c.JSON(404, map[string]string{"error": "Scheduled message not found"})
}
return c.JSON(500, map[string]interface{}{
"error": "Failed to cancel scheduled message",
"details": err.Error(),
})
}
return c.JSON(200, map[string]interface{}{
"message_id": messageID,
"status": "cancelled",
})
}Advanced Queue Features
func setupDeadLetterQueues(app *forge.App) {
queue := app.GetQueue()
// Configure dead letter queue for order processing
dlqOptions := queue.QueueOptions{
Name: "order_processing_dlq",
Durable: true,
AutoDelete: false,
MessageTTL: time.Hour * 24 * 7, // 7 days
MaxLength: 10000,
DeadLetterExchange: "", // No further DLQ
}
err := queue.DeclareQueue(context.Background(), dlqOptions)
if err != nil {
log.Fatalf("Failed to declare DLQ: %v", err)
}
// Start DLQ processor
go processDLQMessages(app)
}
func processDLQMessages(app *forge.App) {
queue := app.GetQueue()
options := queue.ConsumeOptions{
QueueName: "order_processing_dlq",
ConsumerGroup: "dlq_processors",
ConcurrentWorkers: 2,
PrefetchCount: 5,
VisibilityTimeout: time.Minute * 5,
MaxRetries: 1, // Limited retries for DLQ
}
err := queue.Consume(context.Background(), options, handleDLQMessage)
if err != nil {
log.Fatalf("Failed to start DLQ processor: %v", err)
}
}
func handleDLQMessage(ctx context.Context, message queue.Message) error {
log.Printf("Processing DLQ message: %s", message.ID)
// Extract original failure information
failureCount := message.Headers["failure_count"]
lastError := message.Headers["last_error"]
originalQueue := message.Headers["original_queue"]
log.Printf("Message failed %s times, last error: %s, from queue: %s",
failureCount, lastError, originalQueue)
// Analyze failure pattern
var order OrderEvent
if err := json.Unmarshal(message.Body, &order); err != nil {
log.Printf("Failed to unmarshal DLQ message: %v", err)
return sendToManualReview(ctx, message, "unmarshal_error")
}
// Attempt different processing strategies
if err := tryAlternativeProcessing(ctx, order); err != nil {
log.Printf("Alternative processing failed: %v", err)
return sendToManualReview(ctx, message, "processing_failed")
}
log.Printf("Successfully recovered DLQ message: %s", message.ID)
return nil
}
func sendToManualReview(ctx context.Context, message queue.Message, reason string) error {
// Store in database for manual review
review := ManualReview{
MessageID: message.ID,
OriginalQueue: message.Headers["original_queue"],
FailureReason: reason,
MessageBody: string(message.Body),
Headers: message.Headers,
CreatedAt: time.Now(),
Status: "pending",
}
if err := saveManualReview(ctx, review); err != nil {
log.Printf("Failed to save manual review: %v", err)
return err
}
// Notify administrators
notification := AdminNotification{
Type: "dlq_manual_review",
Subject: fmt.Sprintf("DLQ Message Requires Manual Review: %s", message.ID),
Body: fmt.Sprintf("Message from queue %s failed processing and requires manual review. Reason: %s",
message.Headers["original_queue"], reason),
Data: map[string]interface{}{
"message_id": message.ID,
"original_queue": message.Headers["original_queue"],
"failure_reason": reason,
},
}
return sendAdminNotification(ctx, notification)
}
func getDLQStatsHandler(c forge.Context) error {
queue := forge.GetQueue(c)
queueName := c.Query("queue", "order_processing_dlq")
// Get DLQ statistics
info, err := queue.GetQueueInfo(c.Context(), queueName)
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to get DLQ stats",
"details": err.Error(),
})
}
// Get recent DLQ messages for analysis
messages, err := queue.PeekMessages(c.Context(), queueName, 10)
if err != nil {
log.Printf("Failed to peek DLQ messages: %v", err)
messages = []queue.Message{} // Continue without peek data
}
// Analyze failure patterns
failureReasons := make(map[string]int)
originalQueues := make(map[string]int)
for _, msg := range messages {
if reason := msg.Headers["last_error"]; reason != "" {
failureReasons[reason]++
}
if origQueue := msg.Headers["original_queue"]; origQueue != "" {
originalQueues[origQueue]++
}
}
return c.JSON(200, map[string]interface{}{
"queue_name": queueName,
"message_count": info.MessageCount,
"consumer_count": info.ConsumerCount,
"failure_reasons": failureReasons,
"original_queues": originalQueues,
"recent_messages": len(messages),
})
}
func reprocessDLQMessageHandler(c forge.Context) error {
queue := forge.GetQueue(c)
messageID := c.Param("message_id")
var req struct {
TargetQueue string `json:"target_queue"`
ResetRetries bool `json:"reset_retries"`
}
if err := c.Bind(&req); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
// Get message from DLQ
message, err := queue.GetMessage(c.Context(), "order_processing_dlq", messageID)
if err != nil {
if errors.Is(err, queue.ErrMessageNotFound) {
return c.JSON(404, map[string]string{"error": "Message not found in DLQ"})
}
return c.JSON(500, map[string]interface{}{
"error": "Failed to get DLQ message",
"details": err.Error(),
})
}
// Prepare message for reprocessing
reprocessMessage := queue.Message{
ID: uuid.New().String(),
Body: message.Body,
Headers: make(map[string]string),
}
// Copy headers but reset retry-related ones if requested
for k, v := range message.Headers {
if req.ResetRetries && (k == "failure_count" || k == "last_error") {
continue
}
reprocessMessage.Headers[k] = v
}
// Add reprocessing metadata
reprocessMessage.Headers["reprocessed_from_dlq"] = "true"
reprocessMessage.Headers["reprocessed_at"] = time.Now().Format(time.RFC3339)
reprocessMessage.Headers["original_message_id"] = message.ID
targetQueue := req.TargetQueue
if targetQueue == "" {
targetQueue = message.Headers["original_queue"]
if targetQueue == "" {
targetQueue = "order_processing" // Default
}
}
// Publish to target queue
err = queue.Publish(c.Context(), targetQueue, reprocessMessage)
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to reprocess message",
"details": err.Error(),
})
}
// Remove from DLQ
err = queue.Ack(c.Context(), message.ID)
if err != nil {
log.Printf("Failed to ack DLQ message %s: %v", message.ID, err)
// Don't fail the request since message was already reprocessed
}
return c.JSON(200, map[string]interface{}{
"original_message_id": message.ID,
"new_message_id": reprocessMessage.ID,
"target_queue": targetQueue,
"status": "reprocessed",
})
}func createQueueHandler(c forge.Context) error {
queue := forge.GetQueue(c)
var req struct {
Name string `json:"name"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
MessageTTL time.Duration `json:"message_ttl"`
MaxLength int `json:"max_length"`
MaxLengthBytes int `json:"max_length_bytes"`
DeadLetterExchange string `json:"dead_letter_exchange"`
DeadLetterQueue string `json:"dead_letter_queue"`
MaxRetries int `json:"max_retries"`
}
if err := c.Bind(&req); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
if req.Name == "" {
return c.JSON(400, map[string]string{"error": "Queue name is required"})
}
options := queue.QueueOptions{
Name: req.Name,
Durable: req.Durable,
AutoDelete: req.AutoDelete,
MessageTTL: req.MessageTTL,
MaxLength: req.MaxLength,
MaxLengthBytes: req.MaxLengthBytes,
DeadLetterExchange: req.DeadLetterExchange,
DeadLetterQueue: req.DeadLetterQueue,
MaxRetries: req.MaxRetries,
}
err := queue.DeclareQueue(c.Context(), options)
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to create queue",
"details": err.Error(),
})
}
return c.JSON(201, map[string]interface{}{
"queue_name": req.Name,
"status": "created",
"options": options,
})
}
func listQueuesHandler(c forge.Context) error {
queue := forge.GetQueue(c)
queues, err := queue.ListQueues(c.Context())
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to list queues",
"details": err.Error(),
})
}
var queueList []map[string]interface{}
for _, q := range queues {
queueList = append(queueList, map[string]interface{}{
"name": q.Name,
"message_count": q.MessageCount,
"consumer_count": q.ConsumerCount,
"durable": q.Durable,
"auto_delete": q.AutoDelete,
"created_at": q.CreatedAt,
})
}
return c.JSON(200, map[string]interface{}{
"queues": queueList,
"count": len(queueList),
})
}
func getQueueInfoHandler(c forge.Context) error {
queue := forge.GetQueue(c)
queueName := c.Param("queue_name")
if queueName == "" {
return c.JSON(400, map[string]string{"error": "Queue name is required"})
}
info, err := queue.GetQueueInfo(c.Context(), queueName)
if err != nil {
if errors.Is(err, queue.ErrQueueNotFound) {
return c.JSON(404, map[string]string{"error": "Queue not found"})
}
return c.JSON(500, map[string]interface{}{
"error": "Failed to get queue info",
"details": err.Error(),
})
}
return c.JSON(200, map[string]interface{}{
"name": info.Name,
"message_count": info.MessageCount,
"consumer_count": info.ConsumerCount,
"durable": info.Durable,
"auto_delete": info.AutoDelete,
"message_ttl": info.MessageTTL,
"max_length": info.MaxLength,
"created_at": info.CreatedAt,
"updated_at": info.UpdatedAt,
})
}
func purgeQueueHandler(c forge.Context) error {
queue := forge.GetQueue(c)
queueName := c.Param("queue_name")
if queueName == "" {
return c.JSON(400, map[string]string{"error": "Queue name is required"})
}
// Safety check - require confirmation for production queues
confirm := c.Query("confirm")
if confirm != "yes" {
return c.JSON(400, map[string]string{
"error": "Queue purge requires confirmation. Add ?confirm=yes to proceed",
})
}
purgedCount, err := queue.PurgeQueue(c.Context(), queueName)
if err != nil {
if errors.Is(err, queue.ErrQueueNotFound) {
return c.JSON(404, map[string]string{"error": "Queue not found"})
}
return c.JSON(500, map[string]interface{}{
"error": "Failed to purge queue",
"details": err.Error(),
})
}
return c.JSON(200, map[string]interface{}{
"queue_name": queueName,
"purged_count": purgedCount,
"status": "purged",
})
}
func deleteQueueHandler(c forge.Context) error {
queue := forge.GetQueue(c)
queueName := c.Param("queue_name")
if queueName == "" {
return c.JSON(400, map[string]string{"error": "Queue name is required"})
}
// Safety checks
confirm := c.Query("confirm")
if confirm != "yes" {
return c.JSON(400, map[string]string{
"error": "Queue deletion requires confirmation. Add ?confirm=yes to proceed",
})
}
force := c.Query("force") == "true"
// Check if queue has messages (unless forced)
if !force {
info, err := queue.GetQueueInfo(c.Context(), queueName)
if err == nil && info.MessageCount > 0 {
return c.JSON(400, map[string]interface{}{
"error": "Queue has messages. Use ?force=true to delete anyway",
"message_count": info.MessageCount,
})
}
}
err := queue.DeleteQueue(c.Context(), queueName, force)
if err != nil {
if errors.Is(err, queue.ErrQueueNotFound) {
return c.JSON(404, map[string]string{"error": "Queue not found"})
}
return c.JSON(500, map[string]interface{}{
"error": "Failed to delete queue",
"details": err.Error(),
})
}
return c.JSON(200, map[string]interface{}{
"queue_name": queueName,
"status": "deleted",
})
}
func moveMessagesHandler(c forge.Context) error {
queue := forge.GetQueue(c)
var req struct {
SourceQueue string `json:"source_queue"`
DestinationQueue string `json:"destination_queue"`
MaxMessages int `json:"max_messages"`
Filter map[string]string `json:"filter,omitempty"`
}
if err := c.Bind(&req); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
if req.SourceQueue == "" || req.DestinationQueue == "" {
return c.JSON(400, map[string]string{"error": "Source and destination queues are required"})
}
if req.MaxMessages <= 0 {
req.MaxMessages = 100
}
if req.MaxMessages > 1000 {
req.MaxMessages = 1000
}
// Move messages
movedCount, err := queue.MoveMessages(c.Context(), queue.MoveOptions{
SourceQueue: req.SourceQueue,
DestinationQueue: req.DestinationQueue,
MaxMessages: req.MaxMessages,
Filter: req.Filter,
})
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to move messages",
"details": err.Error(),
})
}
return c.JSON(200, map[string]interface{}{
"source_queue": req.SourceQueue,
"destination_queue": req.DestinationQueue,
"moved_count": movedCount,
"status": "completed",
})
}func getQueueStatsHandler(c forge.Context) error {
queue := forge.GetQueue(c)
// Get overall queue statistics
stats, err := queue.Stats(c.Context())
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to get queue stats",
"details": err.Error(),
})
}
return c.JSON(200, map[string]interface{}{
"total_queues": stats.TotalQueues,
"total_messages": stats.TotalMessages,
"total_consumers": stats.TotalConsumers,
"messages_published": stats.MessagesPublished,
"messages_consumed": stats.MessagesConsumed,
"messages_failed": stats.MessagesFailed,
"average_latency_ms": stats.AverageLatency.Milliseconds(),
"throughput_per_sec": stats.ThroughputPerSecond,
"error_rate": stats.ErrorRate,
"uptime_seconds": stats.Uptime.Seconds(),
})
}
func getQueueMetricsHandler(c forge.Context) error {
queue := forge.GetQueue(c)
queueName := c.Param("queue_name")
if queueName == "" {
return c.JSON(400, map[string]string{"error": "Queue name is required"})
}
// Get time range from query parameters
since := c.QueryDuration("since", time.Hour)
interval := c.QueryDuration("interval", time.Minute*5)
metrics, err := queue.GetMetrics(c.Context(), queueName, since, interval)
if err != nil {
return c.JSON(500, map[string]interface{}{
"error": "Failed to get queue metrics",
"details": err.Error(),
})
}
return c.JSON(200, map[string]interface{}{
"queue_name": queueName,
"time_range": since.String(),
"interval": interval.String(),
"metrics": metrics,
})
}
func getQueueHealthHandler(c forge.Context) error {
queue := forge.GetQueue(c)
// Check overall queue health
health, err := queue.Health(c.Context())
if err != nil {
return c.JSON(503, map[string]interface{}{
"status": "unhealthy",
"error": err.Error(),
})
}
status := "healthy"
if !health.Healthy {
status = "unhealthy"
}
return c.JSON(200, map[string]interface{}{
"status": status,
"backend": health.Backend,
"connection_pool": health.ConnectionPool,
"queue_count": health.QueueCount,
"total_messages": health.TotalMessages,
"active_consumers": health.ActiveConsumers,
"last_check": health.LastCheck,
"details": health.Details,
})
}
func setupQueueMonitoring(app *forge.App) {
queue := app.GetQueue()
// Start metrics collection
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
collectQueueMetrics(queue)
case <-app.Context().Done():
return
}
}
}()
// Start health checks
go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
checkQueueHealth(queue)
case <-app.Context().Done():
return
}
}
}()
}
func collectQueueMetrics(queue queue.Queue) {
ctx := context.Background()
// Get queue statistics
stats, err := queue.Stats(ctx)
if err != nil {
log.Printf("Failed to collect queue stats: %v", err)
return
}
// Send metrics to monitoring system
metrics := map[string]interface{}{
"queue.total_queues": stats.TotalQueues,
"queue.total_messages": stats.TotalMessages,
"queue.total_consumers": stats.TotalConsumers,
"queue.messages_published": stats.MessagesPublished,
"queue.messages_consumed": stats.MessagesConsumed,
"queue.messages_failed": stats.MessagesFailed,
"queue.average_latency": stats.AverageLatency.Milliseconds(),
"queue.throughput_per_sec": stats.ThroughputPerSecond,
"queue.error_rate": stats.ErrorRate,
}
// Send to metrics backend (Prometheus, InfluxDB, etc.)
sendMetrics(metrics)
}
func checkQueueHealth(queue queue.Queue) {
ctx := context.Background()
health, err := queue.Health(ctx)
if err != nil {
log.Printf("Queue health check failed: %v", err)
sendAlert("queue_health_check_failed", err.Error())
return
}
if !health.Healthy {
log.Printf("Queue is unhealthy: %v", health.Details)
sendAlert("queue_unhealthy", fmt.Sprintf("Queue health issues: %v", health.Details))
}
}func startConsumerGroup(app *forge.App, groupConfig ConsumerGroupConfig) {
queue := app.GetQueue()
// Configure consumer group
options := queue.ConsumeOptions{
QueueName: groupConfig.QueueName,
ConsumerGroup: groupConfig.GroupName,
ConcurrentWorkers: groupConfig.Workers,
PrefetchCount: groupConfig.PrefetchCount,
VisibilityTimeout: groupConfig.VisibilityTimeout,
MaxRetries: groupConfig.MaxRetries,
RetryStrategy: queue.RetryStrategy{
Type: groupConfig.RetryStrategy.Type,
InitialInterval: groupConfig.RetryStrategy.InitialInterval,
MaxInterval: groupConfig.RetryStrategy.MaxInterval,
Multiplier: groupConfig.RetryStrategy.Multiplier,
},
}
// Start consuming with the configured handler
err := queue.Consume(context.Background(), options, groupConfig.Handler)
if err != nil {
log.Fatalf("Failed to start consumer group %s: %v", groupConfig.GroupName, err)
}
}
type ConsumerGroupConfig struct {
QueueName string
GroupName string
Workers int
PrefetchCount int
VisibilityTimeout time.Duration
MaxRetries int
RetryStrategy RetryStrategyConfig
Handler queue.MessageHandler
}
type RetryStrategyConfig struct {
Type string
InitialInterval time.Duration
MaxInterval time.Duration
Multiplier float64
}
func setupOrderProcessingConsumers(app *forge.App) {
// High-priority order processor
highPriorityConfig := ConsumerGroupConfig{
QueueName: "high_priority_orders",
GroupName: "high_priority_processors",
Workers: 10,
PrefetchCount: 5,
VisibilityTimeout: time.Second * 30,
MaxRetries: 3,
RetryStrategy: RetryStrategyConfig{
Type: "exponential",
InitialInterval: time.Second,
MaxInterval: time.Second * 30,
Multiplier: 2.0,
},
Handler: processHighPriorityOrder,
}
go startConsumerGroup(app, highPriorityConfig)
// Normal order processor
normalConfig := ConsumerGroupConfig{
QueueName: "order_processing",
GroupName: "order_processors",
Workers: 5,
PrefetchCount: 10,
VisibilityTimeout: time.Second * 45,
MaxRetries: 3,
RetryStrategy: RetryStrategyConfig{
Type: "exponential",
InitialInterval: time.Second * 2,
MaxInterval: time.Minute,
Multiplier: 2.0,
},
Handler: processOrderMessage,
}
go startConsumerGroup(app, normalConfig)
// Bulk order processor
bulkConfig := ConsumerGroupConfig{
QueueName: "bulk_orders",
GroupName: "bulk_processors",
Workers: 2,
PrefetchCount: 20,
VisibilityTimeout: time.Minute * 5,
MaxRetries: 2,
RetryStrategy: RetryStrategyConfig{
Type: "linear",
InitialInterval: time.Second * 5,
MaxInterval: time.Second * 30,
Multiplier: 1.0,
},
Handler: processBulkOrderMessage,
}
go startConsumerGroup(app, bulkConfig)
}
func processHighPriorityOrder(ctx context.Context, message queue.Message) error {
var order OrderEvent
if err := json.Unmarshal(message.Body, &order); err != nil {
return queue.ErrPermanentFailure
}
log.Printf("Processing high-priority order %s", order.OrderID)
// Fast-track processing for high-priority orders
if err := fastTrackOrder(ctx, order); err != nil {
return err
}
// Immediate notification
if err := sendUrgentNotification(ctx, order); err != nil {
log.Printf("Failed to send urgent notification: %v", err)
// Don't fail the order processing for notification errors
}
return nil
}
func processBulkOrderMessage(ctx context.Context, message queue.Message) error {
var bulkOrder struct {
BatchID string `json:"batch_id"`
Orders []OrderEvent `json:"orders"`
}
if err := json.Unmarshal(message.Body, &bulkOrder); err != nil {
return queue.ErrPermanentFailure
}
log.Printf("Processing bulk order batch %s with %d orders",
bulkOrder.BatchID, len(bulkOrder.Orders))
// Process orders in batch
var errors []error
for _, order := range bulkOrder.Orders {
if err := processOrderSync(order); err != nil {
errors = append(errors, fmt.Errorf("order %s: %w", order.OrderID, err))
}
}
if len(errors) > 0 {
// If some orders failed, log and potentially retry
log.Printf("Bulk batch %s had %d failures: %v",
bulkOrder.BatchID, len(errors), errors)
// Retry if less than 50% failed
if len(errors) < len(bulkOrder.Orders)/2 {
return fmt.Errorf("partial batch failure: %d/%d orders failed",
len(errors), len(bulkOrder.Orders))
}
// Too many failures, mark as permanent failure
return queue.ErrPermanentFailure
}
log.Printf("Successfully processed bulk batch %s", bulkOrder.BatchID)
return nil
}
func getConsumerGroupStatsHandler(c forge.Context) error {
queue := forge.GetQueue(c)
groupName := c.Param("group_name")
if groupName == "" {
return c.JSON(400, map[string]string{"error": "Consumer group name is required"})
}
// Get consumer group statistics
stats, err := queue.GetConsumerGroupStats(c.Context(), groupName)
if err != nil {
if errors.Is(err, queue.ErrConsumerGroupNotFound) {
return c.JSON(404, map[string]string{"error": "Consumer group not found"})
}
return c.JSON(500, map[string]interface{}{
"error": "Failed to get consumer group stats",
"details": err.Error(),
})
}
return c.JSON(200, map[string]interface{}{
"group_name": stats.GroupName,
"queue_name": stats.QueueName,
"consumer_count": stats.ConsumerCount,
"active_consumers": stats.ActiveConsumers,
"messages_consumed": stats.MessagesConsumed,
"messages_failed": stats.MessagesFailed,
"average_latency": stats.AverageLatency.Milliseconds(),
"last_activity": stats.LastActivity,
"consumers": stats.Consumers,
})
}
func scaleConsumerGroupHandler(c forge.Context) error {
queue := forge.GetQueue(c)
groupName := c.Param("group_name")
var req struct {
Workers int `json:"workers"`
}
if err := c.Bind(&req); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
if req.Workers < 1 || req.Workers > 50 {
return c.JSON(400, map[string]string{"error": "Workers must be between 1 and 50"})
}
// Scale consumer group
err := queue.ScaleConsumerGroup(c.Context(), groupName, req.Workers)
if err != nil {
if errors.Is(err, queue.ErrConsumerGroupNotFound) {
return c.JSON(404, map[string]string{"error": "Consumer group not found"})
}
return c.JSON(500, map[string]interface{}{
"error": "Failed to scale consumer group",
"details": err.Error(),
})
}
return c.JSON(200, map[string]interface{}{
"group_name": groupName,
"workers": req.Workers,
"status": "scaled",
})
}Best Practices
Message Design
- Keep messages small: Aim for messages under 256KB
- Include metadata: Add headers for routing and filtering
- Use structured data: JSON or Protocol Buffers for message bodies
- Add correlation IDs: Track message flows across services
- Include timestamps: For debugging and monitoring
Error Handling
- Distinguish error types: Permanent vs temporary failures
- Implement retry strategies: Exponential backoff with jitter
- Use dead letter queues: Handle messages that can't be processed
- Log failures: Include context for debugging
- Monitor error rates: Set up alerts for high failure rates
Performance Optimization
- Batch operations: Use batch publish/consume when possible
- Tune prefetch: Balance memory usage and throughput
- Connection pooling: Reuse connections efficiently
- Message compression: Compress large message bodies
- Partition queues: Distribute load across multiple queues
Security
- Encrypt sensitive data: Use encryption for sensitive message content
- Validate messages: Sanitize and validate all message data
- Use authentication: Secure queue access with proper credentials
- Network security: Use TLS for message transport
- Access control: Implement proper queue permissions
Troubleshooting
Common Issues
Connection Problems
# Check queue backend connectivity
curl -X GET /api/queue/health
# Verify configuration
curl -X GET /api/queue/statsMessage Processing Failures
# Check dead letter queue
curl -X GET /api/queue/dlq/stats
# View recent failures
curl -X GET /api/queue/metrics/failuresPerformance Issues
# Monitor queue metrics
curl -X GET /api/queue/metrics?since=1h
# Check consumer group performance
curl -X GET /api/queue/consumer-groups/statsDebug Mode
extensions:
queue:
debug: true
log_level: "debug"
trace_messages: trueNext Steps
How is this guide?
Last updated on