Queue Extension

Message queue system with support for multiple backends and advanced messaging patterns

Queue Extension

The Queue extension provides a unified interface for message queue operations with support for multiple backends including Redis, RabbitMQ, Apache Kafka, NATS, and in-memory queues. It offers reliable message delivery, dead letter queues, batch processing, and advanced messaging patterns.

The Queue extension abstracts message queue complexity while providing access to backend-specific features for advanced use cases.

Features

Multiple Queue Backends

  • Redis: Simple pub/sub and list-based queues
  • RabbitMQ: Advanced AMQP messaging with routing
  • Apache Kafka: High-throughput distributed streaming
  • NATS: Lightweight cloud-native messaging
  • Amazon SQS: Managed cloud queue service
  • In-Memory: Fast local queues for development

Core Capabilities

  • Reliable Delivery: At-least-once and exactly-once delivery
  • Dead Letter Queues: Handle failed message processing
  • Delayed Messages: Schedule messages for future delivery
  • Batch Processing: Publish and consume messages in batches
  • Message Acknowledgment: Manual and automatic acknowledgment
  • Queue Management: Create, delete, and monitor queues

Advanced Features

  • Priority Queues: Process high-priority messages first
  • Message Routing: Route messages based on patterns
  • Consumer Groups: Load balance across multiple consumers
  • Message Filtering: Filter messages by headers or content
  • Retry Strategies: Configurable retry with backoff
  • Monitoring: Real-time queue metrics and health checks

Installation

go get github.com/xraph/forge/extensions/queue

Configuration

extensions:
  queue:
    # Default queue backend
    default: "redis"
    
    # Queue backend configurations
    backends:
      redis:
        driver: "redis"
        url: "redis://localhost:6379/0"
        pool_size: 10
        min_idle_conns: 5
        max_retries: 3
        dial_timeout: "5s"
        read_timeout: "3s"
        write_timeout: "3s"
        
      rabbitmq:
        driver: "rabbitmq"
        url: "amqp://guest:guest@localhost:5672/"
        max_channels: 100
        heartbeat: "60s"
        connection_timeout: "30s"
        prefetch_count: 10
        prefetch_size: 0
        
      kafka:
        driver: "kafka"
        brokers: ["localhost:9092"]
        client_id: "forge-queue"
        version: "2.8.0"
        security:
          mechanism: "PLAIN"
          username: ""
          password: ""
        producer:
          max_message_bytes: 1000000
          required_acks: 1
          timeout: "10s"
          compression: "snappy"
          flush_frequency: "100ms"
          flush_messages: 100
        consumer:
          group_id: "forge-consumers"
          session_timeout: "10s"
          heartbeat_interval: "3s"
          rebalance_timeout: "60s"
          
      nats:
        driver: "nats"
        url: "nats://localhost:4222"
        max_reconnects: 10
        reconnect_wait: "2s"
        timeout: "5s"
        ping_interval: "2m"
        max_pings_out: 2
        
      sqs:
        driver: "sqs"
        region: "us-west-2"
        access_key_id: "${AWS_ACCESS_KEY_ID}"
        secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
        endpoint: "" # Optional for LocalStack
        
      memory:
        driver: "memory"
        max_queues: 100
        max_messages_per_queue: 10000
        cleanup_interval: "1m"
    
    # Global queue settings
    settings:
      default_visibility_timeout: "30s"
      default_message_retention: "14d"
      default_max_receives: 3
      dead_letter_queue_suffix: "_dlq"
      monitoring:
        enabled: true
        metrics_interval: "30s"
        health_check_interval: "1m"
      retry:
        max_attempts: 3
        initial_interval: "1s"
        max_interval: "30s"
        multiplier: 2.0
# Default Backend
export QUEUE_DEFAULT="redis"

# Redis Configuration
export QUEUE_REDIS_URL="redis://localhost:6379/0"
export QUEUE_REDIS_POOL_SIZE="10"
export QUEUE_REDIS_MAX_RETRIES="3"

# RabbitMQ Configuration
export QUEUE_RABBITMQ_URL="amqp://guest:guest@localhost:5672/"
export QUEUE_RABBITMQ_MAX_CHANNELS="100"
export QUEUE_RABBITMQ_PREFETCH_COUNT="10"

# Kafka Configuration
export QUEUE_KAFKA_BROKERS="localhost:9092"
export QUEUE_KAFKA_CLIENT_ID="forge-queue"
export QUEUE_KAFKA_CONSUMER_GROUP="forge-consumers"

# NATS Configuration
export QUEUE_NATS_URL="nats://localhost:4222"
export QUEUE_NATS_MAX_RECONNECTS="10"

# SQS Configuration
export QUEUE_SQS_REGION="us-west-2"
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"

# Global Settings
export QUEUE_DEFAULT_VISIBILITY_TIMEOUT="30s"
export QUEUE_MONITORING_ENABLED="true"
package main

import (
    "time"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/queue"
)

func main() {
    app := forge.New()

    // Configure Queue extension
    queueExt := queue.New(queue.Config{
        Default: "redis",
        
        Backends: map[string]queue.BackendConfig{
            "redis": {
                Driver: "redis",
                URL:    "redis://localhost:6379/0",
                PoolSize: 10,
                MinIdleConns: 5,
                MaxRetries: 3,
                DialTimeout: time.Second * 5,
                ReadTimeout: time.Second * 3,
                WriteTimeout: time.Second * 3,
            },
            
            "rabbitmq": {
                Driver: "rabbitmq",
                URL:    "amqp://guest:guest@localhost:5672/",
                MaxChannels: 100,
                Heartbeat: time.Second * 60,
                ConnectionTimeout: time.Second * 30,
                PrefetchCount: 10,
            },
            
            "kafka": {
                Driver: "kafka",
                Brokers: []string{"localhost:9092"},
                ClientID: "forge-queue",
                Version: "2.8.0",
                
                Producer: queue.KafkaProducerConfig{
                    MaxMessageBytes: 1000000,
                    RequiredAcks: 1,
                    Timeout: time.Second * 10,
                    Compression: "snappy",
                    FlushFrequency: time.Millisecond * 100,
                    FlushMessages: 100,
                },
                
                Consumer: queue.KafkaConsumerConfig{
                    GroupID: "forge-consumers",
                    SessionTimeout: time.Second * 10,
                    HeartbeatInterval: time.Second * 3,
                    RebalanceTimeout: time.Second * 60,
                },
            },
        },
        
        Settings: queue.Settings{
            DefaultVisibilityTimeout: time.Second * 30,
            DefaultMessageRetention: time.Hour * 24 * 14,
            DefaultMaxReceives: 3,
            DeadLetterQueueSuffix: "_dlq",
            
            Monitoring: queue.MonitoringConfig{
                Enabled: true,
                MetricsInterval: time.Second * 30,
                HealthCheckInterval: time.Minute,
            },
            
            Retry: queue.RetryConfig{
                MaxAttempts: 3,
                InitialInterval: time.Second,
                MaxInterval: time.Second * 30,
                Multiplier: 2.0,
            },
        },
    })

    app.RegisterExtension(queueExt)
    app.Run()
}

Usage Examples

Basic Queue Operations

type OrderEvent struct {
    OrderID    string    `json:"order_id"`
    CustomerID string    `json:"customer_id"`
    Amount     float64   `json:"amount"`
    Status     string    `json:"status"`
    Timestamp  time.Time `json:"timestamp"`
}

func publishOrderHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    var order OrderEvent
    if err := c.Bind(&order); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    order.Timestamp = time.Now()
    
    // Publish message to order processing queue
    message := queue.Message{
        ID:      uuid.New().String(),
        Body:    order,
        Headers: map[string]string{
            "event_type":   "order_created",
            "customer_id":  order.CustomerID,
            "priority":     "normal",
        },
    }
    
    err := queue.Publish(c.Context(), "order_processing", message)
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to publish order event",
            "details": err.Error(),
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "message_id": message.ID,
        "queue":      "order_processing",
        "status":     "published",
    })
}

func publishNotificationHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    var notification struct {
        UserID  string `json:"user_id"`
        Type    string `json:"type"`
        Title   string `json:"title"`
        Message string `json:"message"`
        Data    map[string]interface{} `json:"data,omitempty"`
    }
    
    if err := c.Bind(&notification); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    // Publish with priority based on notification type
    priority := "normal"
    if notification.Type == "urgent" || notification.Type == "security" {
        priority = "high"
    }
    
    message := queue.Message{
        ID:   uuid.New().String(),
        Body: notification,
        Headers: map[string]string{
            "notification_type": notification.Type,
            "user_id":          notification.UserID,
            "priority":         priority,
        },
        Priority: getPriorityLevel(priority),
    }
    
    err := queue.Publish(c.Context(), "notifications", message)
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to publish notification",
            "details": err.Error(),
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "message_id": message.ID,
        "queue":      "notifications",
        "priority":   priority,
        "status":     "published",
    })
}

func publishEmailHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    var email struct {
        To       []string `json:"to"`
        CC       []string `json:"cc,omitempty"`
        BCC      []string `json:"bcc,omitempty"`
        Subject  string   `json:"subject"`
        Body     string   `json:"body"`
        Template string   `json:"template,omitempty"`
        Data     map[string]interface{} `json:"data,omitempty"`
    }
    
    if err := c.Bind(&email); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    // Validate email
    if len(email.To) == 0 {
        return c.JSON(400, map[string]string{"error": "At least one recipient is required"})
    }
    
    if email.Subject == "" {
        return c.JSON(400, map[string]string{"error": "Subject is required"})
    }
    
    message := queue.Message{
        ID:   uuid.New().String(),
        Body: email,
        Headers: map[string]string{
            "email_type":      getEmailType(email.Template),
            "recipient_count": fmt.Sprintf("%d", len(email.To)),
        },
    }
    
    err := queue.Publish(c.Context(), "email_queue", message)
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to queue email",
            "details": err.Error(),
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "message_id": message.ID,
        "queue":      "email_queue",
        "recipients": len(email.To),
        "status":     "queued",
    })
}

func getPriorityLevel(priority string) int {
    switch priority {
    case "high":
        return 10
    case "normal":
        return 5
    case "low":
        return 1
    default:
        return 5
    }
}

func getEmailType(template string) string {
    if template == "" {
        return "plain"
    }
    return "template"
}
func startOrderProcessor(app *forge.App) {
    queue := app.GetQueue()
    
    // Configure consumer options
    options := queue.ConsumeOptions{
        QueueName:        "order_processing",
        ConsumerGroup:    "order_processors",
        ConcurrentWorkers: 5,
        PrefetchCount:    10,
        VisibilityTimeout: time.Second * 30,
        MaxRetries:       3,
        RetryStrategy: queue.RetryStrategy{
            Type:            "exponential",
            InitialInterval: time.Second,
            MaxInterval:     time.Second * 30,
            Multiplier:      2.0,
        },
    }
    
    // Start consuming messages
    err := queue.Consume(context.Background(), options, processOrderMessage)
    if err != nil {
        log.Fatalf("Failed to start order processor: %v", err)
    }
}

func processOrderMessage(ctx context.Context, message queue.Message) error {
    var order OrderEvent
    if err := json.Unmarshal(message.Body, &order); err != nil {
        log.Printf("Failed to unmarshal order: %v", err)
        return queue.ErrPermanentFailure // Don't retry
    }
    
    log.Printf("Processing order %s for customer %s", order.OrderID, order.CustomerID)
    
    // Validate order
    if err := validateOrder(order); err != nil {
        log.Printf("Invalid order %s: %v", order.OrderID, err)
        return queue.ErrPermanentFailure
    }
    
    // Process payment
    if err := processPayment(ctx, order); err != nil {
        if isTemporaryError(err) {
            log.Printf("Temporary payment error for order %s: %v", order.OrderID, err)
            return err // Will retry
        }
        log.Printf("Permanent payment error for order %s: %v", order.OrderID, err)
        return queue.ErrPermanentFailure
    }
    
    // Update inventory
    if err := updateInventory(ctx, order); err != nil {
        log.Printf("Failed to update inventory for order %s: %v", order.OrderID, err)
        return err // Will retry
    }
    
    // Send confirmation
    if err := sendOrderConfirmation(ctx, order); err != nil {
        log.Printf("Failed to send confirmation for order %s: %v", order.OrderID, err)
        // Don't fail the entire process for notification errors
    }
    
    log.Printf("Successfully processed order %s", order.OrderID)
    return nil
}

func startNotificationProcessor(app *forge.App) {
    queue := app.GetQueue()
    
    options := queue.ConsumeOptions{
        QueueName:         "notifications",
        ConsumerGroup:     "notification_processors",
        ConcurrentWorkers: 10,
        PrefetchCount:     20,
        VisibilityTimeout: time.Second * 15,
        MaxRetries:        2,
    }
    
    err := queue.Consume(context.Background(), options, processNotificationMessage)
    if err != nil {
        log.Fatalf("Failed to start notification processor: %v", err)
    }
}

func processNotificationMessage(ctx context.Context, message queue.Message) error {
    var notification struct {
        UserID  string `json:"user_id"`
        Type    string `json:"type"`
        Title   string `json:"title"`
        Message string `json:"message"`
        Data    map[string]interface{} `json:"data,omitempty"`
    }
    
    if err := json.Unmarshal(message.Body, &notification); err != nil {
        return queue.ErrPermanentFailure
    }
    
    log.Printf("Processing %s notification for user %s", 
              notification.Type, notification.UserID)
    
    // Get user preferences
    preferences, err := getUserNotificationPreferences(ctx, notification.UserID)
    if err != nil {
        log.Printf("Failed to get preferences for user %s: %v", 
                  notification.UserID, err)
        return err
    }
    
    // Check if user wants this type of notification
    if !preferences.IsEnabled(notification.Type) {
        log.Printf("User %s has disabled %s notifications", 
                  notification.UserID, notification.Type)
        return nil // Skip without error
    }
    
    // Send via preferred channels
    var errors []error
    
    if preferences.Email {
        if err := sendEmailNotification(ctx, notification); err != nil {
            errors = append(errors, fmt.Errorf("email: %w", err))
        }
    }
    
    if preferences.Push {
        if err := sendPushNotification(ctx, notification); err != nil {
            errors = append(errors, fmt.Errorf("push: %w", err))
        }
    }
    
    if preferences.SMS && notification.Type == "urgent" {
        if err := sendSMSNotification(ctx, notification); err != nil {
            errors = append(errors, fmt.Errorf("sms: %w", err))
        }
    }
    
    if len(errors) > 0 {
        log.Printf("Some notification channels failed for user %s: %v", 
                  notification.UserID, errors)
        // Return error to retry if any channel failed
        return fmt.Errorf("notification delivery failed: %v", errors)
    }
    
    log.Printf("Successfully sent %s notification to user %s", 
              notification.Type, notification.UserID)
    return nil
}

func startEmailProcessor(app *forge.App) {
    queue := app.GetQueue()
    
    options := queue.ConsumeOptions{
        QueueName:         "email_queue",
        ConsumerGroup:     "email_processors",
        ConcurrentWorkers: 3, // Limit to avoid overwhelming SMTP server
        PrefetchCount:     5,
        VisibilityTimeout: time.Minute * 2,
        MaxRetries:        3,
        RetryStrategy: queue.RetryStrategy{
            Type:            "exponential",
            InitialInterval: time.Second * 5,
            MaxInterval:     time.Minute * 5,
            Multiplier:      2.0,
        },
    }
    
    err := queue.Consume(context.Background(), options, processEmailMessage)
    if err != nil {
        log.Fatalf("Failed to start email processor: %v", err)
    }
}

func processEmailMessage(ctx context.Context, message queue.Message) error {
    var email struct {
        To       []string `json:"to"`
        CC       []string `json:"cc,omitempty"`
        BCC      []string `json:"bcc,omitempty"`
        Subject  string   `json:"subject"`
        Body     string   `json:"body"`
        Template string   `json:"template,omitempty"`
        Data     map[string]interface{} `json:"data,omitempty"`
    }
    
    if err := json.Unmarshal(message.Body, &email); err != nil {
        return queue.ErrPermanentFailure
    }
    
    log.Printf("Processing email: %s to %d recipients", 
              email.Subject, len(email.To))
    
    // Render template if specified
    if email.Template != "" {
        renderedBody, err := renderEmailTemplate(email.Template, email.Data)
        if err != nil {
            log.Printf("Failed to render email template %s: %v", 
                      email.Template, err)
            return queue.ErrPermanentFailure
        }
        email.Body = renderedBody
    }
    
    // Send email
    if err := sendEmail(ctx, email); err != nil {
        if isTemporaryEmailError(err) {
            log.Printf("Temporary email error: %v", err)
            return err // Will retry
        }
        log.Printf("Permanent email error: %v", err)
        return queue.ErrPermanentFailure
    }
    
    log.Printf("Successfully sent email: %s", email.Subject)
    return nil
}
func publishBatchOrdersHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    var orders []OrderEvent
    if err := c.Bind(&orders); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    if len(orders) == 0 {
        return c.JSON(400, map[string]string{"error": "No orders provided"})
    }
    
    if len(orders) > 100 {
        return c.JSON(400, map[string]string{"error": "Maximum 100 orders per batch"})
    }
    
    // Prepare batch messages
    var messages []queue.Message
    for _, order := range orders {
        order.Timestamp = time.Now()
        
        message := queue.Message{
            ID:   uuid.New().String(),
            Body: order,
            Headers: map[string]string{
                "event_type":   "order_created",
                "customer_id":  order.CustomerID,
                "batch_id":     c.Get("batch_id").(string),
            },
        }
        messages = append(messages, message)
    }
    
    // Publish batch
    results, err := queue.PublishBatch(c.Context(), "order_processing", messages)
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to publish batch",
            "details": err.Error(),
        })
    }
    
    // Analyze results
    var successful, failed int
    var failedMessages []string
    
    for i, result := range results {
        if result.Error != nil {
            failed++
            failedMessages = append(failedMessages, 
                fmt.Sprintf("Message %d: %v", i, result.Error))
        } else {
            successful++
        }
    }
    
    response := map[string]interface{}{
        "total":      len(orders),
        "successful": successful,
        "failed":     failed,
        "queue":      "order_processing",
    }
    
    if failed > 0 {
        response["errors"] = failedMessages
    }
    
    statusCode := 200
    if failed > 0 && successful == 0 {
        statusCode = 500
    } else if failed > 0 {
        statusCode = 207 // Partial success
    }
    
    return c.JSON(statusCode, response)
}

func batchConsumeOrdersHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    batchSize := c.QueryInt("batch_size", 10)
    if batchSize > 50 {
        batchSize = 50
    }
    
    timeout := c.QueryDuration("timeout", time.Second*30)
    
    // Consume batch of messages
    messages, err := queue.ConsumeBatch(c.Context(), "order_processing", 
        queue.BatchConsumeOptions{
            BatchSize: batchSize,
            Timeout:   timeout,
            VisibilityTimeout: time.Minute,
        })
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to consume batch",
            "details": err.Error(),
        })
    }
    
    if len(messages) == 0 {
        return c.JSON(200, map[string]interface{}{
            "messages": []interface{}{},
            "count":    0,
            "message":  "No messages available",
        })
    }
    
    // Process messages
    var processed []map[string]interface{}
    var errors []string
    
    for _, message := range messages {
        var order OrderEvent
        if err := json.Unmarshal(message.Body, &order); err != nil {
            errors = append(errors, fmt.Sprintf("Message %s: invalid format", message.ID))
            queue.Nack(c.Context(), message.ID, false) // Don't requeue
            continue
        }
        
        // Process order (simplified)
        if err := processOrderSync(order); err != nil {
            errors = append(errors, fmt.Sprintf("Order %s: %v", order.OrderID, err))
            queue.Nack(c.Context(), message.ID, true) // Requeue for retry
            continue
        }
        
        // Acknowledge successful processing
        if err := queue.Ack(c.Context(), message.ID); err != nil {
            errors = append(errors, fmt.Sprintf("Failed to ack message %s: %v", message.ID, err))
        }
        
        processed = append(processed, map[string]interface{}{
            "message_id": message.ID,
            "order_id":   order.OrderID,
            "status":     "processed",
        })
    }
    
    response := map[string]interface{}{
        "requested":  batchSize,
        "received":   len(messages),
        "processed":  len(processed),
        "failed":     len(errors),
        "messages":   processed,
    }
    
    if len(errors) > 0 {
        response["errors"] = errors
    }
    
    return c.JSON(200, response)
}

func bulkNotificationHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    var req struct {
        UserIDs []string `json:"user_ids"`
        Type    string   `json:"type"`
        Title   string   `json:"title"`
        Message string   `json:"message"`
        Data    map[string]interface{} `json:"data,omitempty"`
    }
    
    if err := c.Bind(&req); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    if len(req.UserIDs) == 0 {
        return c.JSON(400, map[string]string{"error": "No user IDs provided"})
    }
    
    if len(req.UserIDs) > 1000 {
        return c.JSON(400, map[string]string{"error": "Maximum 1000 users per bulk notification"})
    }
    
    // Create messages for each user
    var messages []queue.Message
    batchID := uuid.New().String()
    
    for _, userID := range req.UserIDs {
        notification := map[string]interface{}{
            "user_id": userID,
            "type":    req.Type,
            "title":   req.Title,
            "message": req.Message,
            "data":    req.Data,
        }
        
        message := queue.Message{
            ID:   uuid.New().String(),
            Body: notification,
            Headers: map[string]string{
                "notification_type": req.Type,
                "user_id":          userID,
                "batch_id":         batchID,
                "bulk_notification": "true",
            },
        }
        messages = append(messages, message)
    }
    
    // Publish in chunks to avoid overwhelming the queue
    chunkSize := 100
    var totalSuccessful, totalFailed int
    var allErrors []string
    
    for i := 0; i < len(messages); i += chunkSize {
        end := i + chunkSize
        if end > len(messages) {
            end = len(messages)
        }
        
        chunk := messages[i:end]
        results, err := queue.PublishBatch(c.Context(), "notifications", chunk)
        if err != nil {
            totalFailed += len(chunk)
            allErrors = append(allErrors, fmt.Sprintf("Chunk %d-%d: %v", i, end-1, err))
            continue
        }
        
        for j, result := range results {
            if result.Error != nil {
                totalFailed++
                allErrors = append(allErrors, 
                    fmt.Sprintf("Message %d: %v", i+j, result.Error))
            } else {
                totalSuccessful++
            }
        }
    }
    
    response := map[string]interface{}{
        "batch_id":   batchID,
        "total":      len(req.UserIDs),
        "successful": totalSuccessful,
        "failed":     totalFailed,
        "queue":      "notifications",
    }
    
    if totalFailed > 0 {
        response["errors"] = allErrors
    }
    
    statusCode := 200
    if totalFailed > 0 && totalSuccessful == 0 {
        statusCode = 500
    } else if totalFailed > 0 {
        statusCode = 207
    }
    
    return c.JSON(statusCode, response)
}
func scheduleReminderHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    var reminder struct {
        UserID    string    `json:"user_id"`
        Type      string    `json:"type"`
        Title     string    `json:"title"`
        Message   string    `json:"message"`
        ScheduledAt time.Time `json:"scheduled_at"`
        Data      map[string]interface{} `json:"data,omitempty"`
    }
    
    if err := c.Bind(&reminder); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    // Validate scheduled time
    if reminder.ScheduledAt.Before(time.Now()) {
        return c.JSON(400, map[string]string{"error": "Scheduled time must be in the future"})
    }
    
    // Calculate delay
    delay := time.Until(reminder.ScheduledAt)
    if delay > time.Hour*24*30 { // 30 days max
        return c.JSON(400, map[string]string{"error": "Maximum delay is 30 days"})
    }
    
    message := queue.Message{
        ID:   uuid.New().String(),
        Body: reminder,
        Headers: map[string]string{
            "reminder_type": reminder.Type,
            "user_id":      reminder.UserID,
            "scheduled_at": reminder.ScheduledAt.Format(time.RFC3339),
        },
        DelayUntil: reminder.ScheduledAt,
    }
    
    err := queue.PublishDelayed(c.Context(), "reminders", message, delay)
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to schedule reminder",
            "details": err.Error(),
        })
    }
    
    return c.JSON(201, map[string]interface{}{
        "message_id":   message.ID,
        "scheduled_at": reminder.ScheduledAt,
        "delay_seconds": int(delay.Seconds()),
        "queue":        "reminders",
        "status":       "scheduled",
    })
}

func scheduleReportHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    var report struct {
        ReportType string `json:"report_type"`
        UserID     string `json:"user_id"`
        Frequency  string `json:"frequency"` // daily, weekly, monthly
        Time       string `json:"time"`      // HH:MM format
        Timezone   string `json:"timezone"`
        Config     map[string]interface{} `json:"config,omitempty"`
    }
    
    if err := c.Bind(&report); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    // Parse timezone
    loc, err := time.LoadLocation(report.Timezone)
    if err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid timezone"})
    }
    
    // Parse time
    timeParts := strings.Split(report.Time, ":")
    if len(timeParts) != 2 {
        return c.JSON(400, map[string]string{"error": "Invalid time format (use HH:MM)"})
    }
    
    hour, err := strconv.Atoi(timeParts[0])
    if err != nil || hour < 0 || hour > 23 {
        return c.JSON(400, map[string]string{"error": "Invalid hour"})
    }
    
    minute, err := strconv.Atoi(timeParts[1])
    if err != nil || minute < 0 || minute > 59 {
        return c.JSON(400, map[string]string{"error": "Invalid minute"})
    }
    
    // Calculate next execution time
    now := time.Now().In(loc)
    nextExecution := time.Date(now.Year(), now.Month(), now.Day(), 
                              hour, minute, 0, 0, loc)
    
    // Adjust based on frequency
    switch report.Frequency {
    case "daily":
        if nextExecution.Before(now) {
            nextExecution = nextExecution.Add(24 * time.Hour)
        }
    case "weekly":
        if nextExecution.Before(now) {
            nextExecution = nextExecution.Add(7 * 24 * time.Hour)
        }
        // Adjust to next Monday (or specified day)
        for nextExecution.Weekday() != time.Monday {
            nextExecution = nextExecution.Add(24 * time.Hour)
        }
    case "monthly":
        if nextExecution.Before(now) {
            nextExecution = nextExecution.AddDate(0, 1, 0)
        }
        // Adjust to first day of month
        nextExecution = time.Date(nextExecution.Year(), nextExecution.Month(), 1,
                                hour, minute, 0, 0, loc)
    default:
        return c.JSON(400, map[string]string{"error": "Invalid frequency"})
    }
    
    delay := time.Until(nextExecution)
    
    message := queue.Message{
        ID:   uuid.New().String(),
        Body: report,
        Headers: map[string]string{
            "report_type":    report.ReportType,
            "user_id":       report.UserID,
            "frequency":     report.Frequency,
            "next_execution": nextExecution.Format(time.RFC3339),
            "timezone":      report.Timezone,
        },
        DelayUntil: nextExecution,
    }
    
    err = queue.PublishDelayed(c.Context(), "scheduled_reports", message, delay)
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to schedule report",
            "details": err.Error(),
        })
    }
    
    return c.JSON(201, map[string]interface{}{
        "message_id":     message.ID,
        "next_execution": nextExecution,
        "delay_seconds":  int(delay.Seconds()),
        "frequency":      report.Frequency,
        "queue":          "scheduled_reports",
        "status":         "scheduled",
    })
}

func processScheduledReportMessage(ctx context.Context, message queue.Message) error {
    var report struct {
        ReportType string `json:"report_type"`
        UserID     string `json:"user_id"`
        Frequency  string `json:"frequency"`
        Time       string `json:"time"`
        Timezone   string `json:"timezone"`
        Config     map[string]interface{} `json:"config,omitempty"`
    }
    
    if err := json.Unmarshal(message.Body, &report); err != nil {
        return queue.ErrPermanentFailure
    }
    
    log.Printf("Generating %s report for user %s", report.ReportType, report.UserID)
    
    // Generate report
    reportData, err := generateReport(ctx, report.ReportType, report.UserID, report.Config)
    if err != nil {
        log.Printf("Failed to generate report: %v", err)
        return err // Will retry
    }
    
    // Send report to user
    if err := sendReport(ctx, report.UserID, reportData); err != nil {
        log.Printf("Failed to send report: %v", err)
        return err // Will retry
    }
    
    // Schedule next execution if recurring
    if report.Frequency != "once" {
        if err := scheduleNextReport(ctx, report); err != nil {
            log.Printf("Failed to schedule next report: %v", err)
            // Don't fail the current execution for scheduling errors
        }
    }
    
    log.Printf("Successfully processed %s report for user %s", 
              report.ReportType, report.UserID)
    return nil
}

func cancelScheduledMessageHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    messageID := c.Param("message_id")
    
    if messageID == "" {
        return c.JSON(400, map[string]string{"error": "Message ID is required"})
    }
    
    // Cancel scheduled message
    err := queue.CancelDelayed(c.Context(), messageID)
    if err != nil {
        if errors.Is(err, queue.ErrMessageNotFound) {
            return c.JSON(404, map[string]string{"error": "Scheduled message not found"})
        }
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to cancel scheduled message",
            "details": err.Error(),
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "message_id": messageID,
        "status":     "cancelled",
    })
}

Advanced Queue Features

func setupDeadLetterQueues(app *forge.App) {
    queue := app.GetQueue()
    
    // Configure dead letter queue for order processing
    dlqOptions := queue.QueueOptions{
        Name:                "order_processing_dlq",
        Durable:            true,
        AutoDelete:         false,
        MessageTTL:         time.Hour * 24 * 7, // 7 days
        MaxLength:          10000,
        DeadLetterExchange: "", // No further DLQ
    }
    
    err := queue.DeclareQueue(context.Background(), dlqOptions)
    if err != nil {
        log.Fatalf("Failed to declare DLQ: %v", err)
    }
    
    // Start DLQ processor
    go processDLQMessages(app)
}

func processDLQMessages(app *forge.App) {
    queue := app.GetQueue()
    
    options := queue.ConsumeOptions{
        QueueName:         "order_processing_dlq",
        ConsumerGroup:     "dlq_processors",
        ConcurrentWorkers: 2,
        PrefetchCount:     5,
        VisibilityTimeout: time.Minute * 5,
        MaxRetries:        1, // Limited retries for DLQ
    }
    
    err := queue.Consume(context.Background(), options, handleDLQMessage)
    if err != nil {
        log.Fatalf("Failed to start DLQ processor: %v", err)
    }
}

func handleDLQMessage(ctx context.Context, message queue.Message) error {
    log.Printf("Processing DLQ message: %s", message.ID)
    
    // Extract original failure information
    failureCount := message.Headers["failure_count"]
    lastError := message.Headers["last_error"]
    originalQueue := message.Headers["original_queue"]
    
    log.Printf("Message failed %s times, last error: %s, from queue: %s", 
              failureCount, lastError, originalQueue)
    
    // Analyze failure pattern
    var order OrderEvent
    if err := json.Unmarshal(message.Body, &order); err != nil {
        log.Printf("Failed to unmarshal DLQ message: %v", err)
        return sendToManualReview(ctx, message, "unmarshal_error")
    }
    
    // Attempt different processing strategies
    if err := tryAlternativeProcessing(ctx, order); err != nil {
        log.Printf("Alternative processing failed: %v", err)
        return sendToManualReview(ctx, message, "processing_failed")
    }
    
    log.Printf("Successfully recovered DLQ message: %s", message.ID)
    return nil
}

func sendToManualReview(ctx context.Context, message queue.Message, reason string) error {
    // Store in database for manual review
    review := ManualReview{
        MessageID:     message.ID,
        OriginalQueue: message.Headers["original_queue"],
        FailureReason: reason,
        MessageBody:   string(message.Body),
        Headers:       message.Headers,
        CreatedAt:     time.Now(),
        Status:        "pending",
    }
    
    if err := saveManualReview(ctx, review); err != nil {
        log.Printf("Failed to save manual review: %v", err)
        return err
    }
    
    // Notify administrators
    notification := AdminNotification{
        Type:    "dlq_manual_review",
        Subject: fmt.Sprintf("DLQ Message Requires Manual Review: %s", message.ID),
        Body:    fmt.Sprintf("Message from queue %s failed processing and requires manual review. Reason: %s", 
                           message.Headers["original_queue"], reason),
        Data: map[string]interface{}{
            "message_id":     message.ID,
            "original_queue": message.Headers["original_queue"],
            "failure_reason": reason,
        },
    }
    
    return sendAdminNotification(ctx, notification)
}

func getDLQStatsHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    queueName := c.Query("queue", "order_processing_dlq")
    
    // Get DLQ statistics
    info, err := queue.GetQueueInfo(c.Context(), queueName)
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to get DLQ stats",
            "details": err.Error(),
        })
    }
    
    // Get recent DLQ messages for analysis
    messages, err := queue.PeekMessages(c.Context(), queueName, 10)
    if err != nil {
        log.Printf("Failed to peek DLQ messages: %v", err)
        messages = []queue.Message{} // Continue without peek data
    }
    
    // Analyze failure patterns
    failureReasons := make(map[string]int)
    originalQueues := make(map[string]int)
    
    for _, msg := range messages {
        if reason := msg.Headers["last_error"]; reason != "" {
            failureReasons[reason]++
        }
        if origQueue := msg.Headers["original_queue"]; origQueue != "" {
            originalQueues[origQueue]++
        }
    }
    
    return c.JSON(200, map[string]interface{}{
        "queue_name":      queueName,
        "message_count":   info.MessageCount,
        "consumer_count":  info.ConsumerCount,
        "failure_reasons": failureReasons,
        "original_queues": originalQueues,
        "recent_messages": len(messages),
    })
}

func reprocessDLQMessageHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    messageID := c.Param("message_id")
    
    var req struct {
        TargetQueue string `json:"target_queue"`
        ResetRetries bool  `json:"reset_retries"`
    }
    
    if err := c.Bind(&req); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    // Get message from DLQ
    message, err := queue.GetMessage(c.Context(), "order_processing_dlq", messageID)
    if err != nil {
        if errors.Is(err, queue.ErrMessageNotFound) {
            return c.JSON(404, map[string]string{"error": "Message not found in DLQ"})
        }
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to get DLQ message",
            "details": err.Error(),
        })
    }
    
    // Prepare message for reprocessing
    reprocessMessage := queue.Message{
        ID:      uuid.New().String(),
        Body:    message.Body,
        Headers: make(map[string]string),
    }
    
    // Copy headers but reset retry-related ones if requested
    for k, v := range message.Headers {
        if req.ResetRetries && (k == "failure_count" || k == "last_error") {
            continue
        }
        reprocessMessage.Headers[k] = v
    }
    
    // Add reprocessing metadata
    reprocessMessage.Headers["reprocessed_from_dlq"] = "true"
    reprocessMessage.Headers["reprocessed_at"] = time.Now().Format(time.RFC3339)
    reprocessMessage.Headers["original_message_id"] = message.ID
    
    targetQueue := req.TargetQueue
    if targetQueue == "" {
        targetQueue = message.Headers["original_queue"]
        if targetQueue == "" {
            targetQueue = "order_processing" // Default
        }
    }
    
    // Publish to target queue
    err = queue.Publish(c.Context(), targetQueue, reprocessMessage)
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to reprocess message",
            "details": err.Error(),
        })
    }
    
    // Remove from DLQ
    err = queue.Ack(c.Context(), message.ID)
    if err != nil {
        log.Printf("Failed to ack DLQ message %s: %v", message.ID, err)
        // Don't fail the request since message was already reprocessed
    }
    
    return c.JSON(200, map[string]interface{}{
        "original_message_id": message.ID,
        "new_message_id":      reprocessMessage.ID,
        "target_queue":        targetQueue,
        "status":              "reprocessed",
    })
}
func createQueueHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    var req struct {
        Name                string        `json:"name"`
        Durable            bool          `json:"durable"`
        AutoDelete         bool          `json:"auto_delete"`
        MessageTTL         time.Duration `json:"message_ttl"`
        MaxLength          int           `json:"max_length"`
        MaxLengthBytes     int           `json:"max_length_bytes"`
        DeadLetterExchange string        `json:"dead_letter_exchange"`
        DeadLetterQueue    string        `json:"dead_letter_queue"`
        MaxRetries         int           `json:"max_retries"`
    }
    
    if err := c.Bind(&req); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    if req.Name == "" {
        return c.JSON(400, map[string]string{"error": "Queue name is required"})
    }
    
    options := queue.QueueOptions{
        Name:                req.Name,
        Durable:            req.Durable,
        AutoDelete:         req.AutoDelete,
        MessageTTL:         req.MessageTTL,
        MaxLength:          req.MaxLength,
        MaxLengthBytes:     req.MaxLengthBytes,
        DeadLetterExchange: req.DeadLetterExchange,
        DeadLetterQueue:    req.DeadLetterQueue,
        MaxRetries:         req.MaxRetries,
    }
    
    err := queue.DeclareQueue(c.Context(), options)
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to create queue",
            "details": err.Error(),
        })
    }
    
    return c.JSON(201, map[string]interface{}{
        "queue_name": req.Name,
        "status":     "created",
        "options":    options,
    })
}

func listQueuesHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    queues, err := queue.ListQueues(c.Context())
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to list queues",
            "details": err.Error(),
        })
    }
    
    var queueList []map[string]interface{}
    for _, q := range queues {
        queueList = append(queueList, map[string]interface{}{
            "name":           q.Name,
            "message_count":  q.MessageCount,
            "consumer_count": q.ConsumerCount,
            "durable":        q.Durable,
            "auto_delete":    q.AutoDelete,
            "created_at":     q.CreatedAt,
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "queues": queueList,
        "count":  len(queueList),
    })
}

func getQueueInfoHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    queueName := c.Param("queue_name")
    
    if queueName == "" {
        return c.JSON(400, map[string]string{"error": "Queue name is required"})
    }
    
    info, err := queue.GetQueueInfo(c.Context(), queueName)
    if err != nil {
        if errors.Is(err, queue.ErrQueueNotFound) {
            return c.JSON(404, map[string]string{"error": "Queue not found"})
        }
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to get queue info",
            "details": err.Error(),
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "name":           info.Name,
        "message_count":  info.MessageCount,
        "consumer_count": info.ConsumerCount,
        "durable":        info.Durable,
        "auto_delete":    info.AutoDelete,
        "message_ttl":    info.MessageTTL,
        "max_length":     info.MaxLength,
        "created_at":     info.CreatedAt,
        "updated_at":     info.UpdatedAt,
    })
}

func purgeQueueHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    queueName := c.Param("queue_name")
    
    if queueName == "" {
        return c.JSON(400, map[string]string{"error": "Queue name is required"})
    }
    
    // Safety check - require confirmation for production queues
    confirm := c.Query("confirm")
    if confirm != "yes" {
        return c.JSON(400, map[string]string{
            "error": "Queue purge requires confirmation. Add ?confirm=yes to proceed",
        })
    }
    
    purgedCount, err := queue.PurgeQueue(c.Context(), queueName)
    if err != nil {
        if errors.Is(err, queue.ErrQueueNotFound) {
            return c.JSON(404, map[string]string{"error": "Queue not found"})
        }
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to purge queue",
            "details": err.Error(),
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "queue_name":    queueName,
        "purged_count":  purgedCount,
        "status":        "purged",
    })
}

func deleteQueueHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    queueName := c.Param("queue_name")
    
    if queueName == "" {
        return c.JSON(400, map[string]string{"error": "Queue name is required"})
    }
    
    // Safety checks
    confirm := c.Query("confirm")
    if confirm != "yes" {
        return c.JSON(400, map[string]string{
            "error": "Queue deletion requires confirmation. Add ?confirm=yes to proceed",
        })
    }
    
    force := c.Query("force") == "true"
    
    // Check if queue has messages (unless forced)
    if !force {
        info, err := queue.GetQueueInfo(c.Context(), queueName)
        if err == nil && info.MessageCount > 0 {
            return c.JSON(400, map[string]interface{}{
                "error": "Queue has messages. Use ?force=true to delete anyway",
                "message_count": info.MessageCount,
            })
        }
    }
    
    err := queue.DeleteQueue(c.Context(), queueName, force)
    if err != nil {
        if errors.Is(err, queue.ErrQueueNotFound) {
            return c.JSON(404, map[string]string{"error": "Queue not found"})
        }
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to delete queue",
            "details": err.Error(),
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "queue_name": queueName,
        "status":     "deleted",
    })
}

func moveMessagesHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    var req struct {
        SourceQueue      string `json:"source_queue"`
        DestinationQueue string `json:"destination_queue"`
        MaxMessages      int    `json:"max_messages"`
        Filter           map[string]string `json:"filter,omitempty"`
    }
    
    if err := c.Bind(&req); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    if req.SourceQueue == "" || req.DestinationQueue == "" {
        return c.JSON(400, map[string]string{"error": "Source and destination queues are required"})
    }
    
    if req.MaxMessages <= 0 {
        req.MaxMessages = 100
    }
    
    if req.MaxMessages > 1000 {
        req.MaxMessages = 1000
    }
    
    // Move messages
    movedCount, err := queue.MoveMessages(c.Context(), queue.MoveOptions{
        SourceQueue:      req.SourceQueue,
        DestinationQueue: req.DestinationQueue,
        MaxMessages:      req.MaxMessages,
        Filter:           req.Filter,
    })
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to move messages",
            "details": err.Error(),
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "source_queue":      req.SourceQueue,
        "destination_queue": req.DestinationQueue,
        "moved_count":       movedCount,
        "status":            "completed",
    })
}
func getQueueStatsHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    // Get overall queue statistics
    stats, err := queue.Stats(c.Context())
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to get queue stats",
            "details": err.Error(),
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "total_queues":        stats.TotalQueues,
        "total_messages":      stats.TotalMessages,
        "total_consumers":     stats.TotalConsumers,
        "messages_published":  stats.MessagesPublished,
        "messages_consumed":   stats.MessagesConsumed,
        "messages_failed":     stats.MessagesFailed,
        "average_latency_ms":  stats.AverageLatency.Milliseconds(),
        "throughput_per_sec":  stats.ThroughputPerSecond,
        "error_rate":          stats.ErrorRate,
        "uptime_seconds":      stats.Uptime.Seconds(),
    })
}

func getQueueMetricsHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    queueName := c.Param("queue_name")
    
    if queueName == "" {
        return c.JSON(400, map[string]string{"error": "Queue name is required"})
    }
    
    // Get time range from query parameters
    since := c.QueryDuration("since", time.Hour)
    interval := c.QueryDuration("interval", time.Minute*5)
    
    metrics, err := queue.GetMetrics(c.Context(), queueName, since, interval)
    if err != nil {
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to get queue metrics",
            "details": err.Error(),
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "queue_name": queueName,
        "time_range": since.String(),
        "interval":   interval.String(),
        "metrics":    metrics,
    })
}

func getQueueHealthHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    
    // Check overall queue health
    health, err := queue.Health(c.Context())
    if err != nil {
        return c.JSON(503, map[string]interface{}{
            "status": "unhealthy",
            "error":  err.Error(),
        })
    }
    
    status := "healthy"
    if !health.Healthy {
        status = "unhealthy"
    }
    
    return c.JSON(200, map[string]interface{}{
        "status":           status,
        "backend":          health.Backend,
        "connection_pool":  health.ConnectionPool,
        "queue_count":      health.QueueCount,
        "total_messages":   health.TotalMessages,
        "active_consumers": health.ActiveConsumers,
        "last_check":       health.LastCheck,
        "details":          health.Details,
    })
}

func setupQueueMonitoring(app *forge.App) {
    queue := app.GetQueue()
    
    // Start metrics collection
    go func() {
        ticker := time.NewTicker(30 * time.Second)
        defer ticker.Stop()
        
        for {
            select {
            case <-ticker.C:
                collectQueueMetrics(queue)
            case <-app.Context().Done():
                return
            }
        }
    }()
    
    // Start health checks
    go func() {
        ticker := time.NewTicker(time.Minute)
        defer ticker.Stop()
        
        for {
            select {
            case <-ticker.C:
                checkQueueHealth(queue)
            case <-app.Context().Done():
                return
            }
        }
    }()
}

func collectQueueMetrics(queue queue.Queue) {
    ctx := context.Background()
    
    // Get queue statistics
    stats, err := queue.Stats(ctx)
    if err != nil {
        log.Printf("Failed to collect queue stats: %v", err)
        return
    }
    
    // Send metrics to monitoring system
    metrics := map[string]interface{}{
        "queue.total_queues":        stats.TotalQueues,
        "queue.total_messages":      stats.TotalMessages,
        "queue.total_consumers":     stats.TotalConsumers,
        "queue.messages_published":  stats.MessagesPublished,
        "queue.messages_consumed":   stats.MessagesConsumed,
        "queue.messages_failed":     stats.MessagesFailed,
        "queue.average_latency":     stats.AverageLatency.Milliseconds(),
        "queue.throughput_per_sec":  stats.ThroughputPerSecond,
        "queue.error_rate":          stats.ErrorRate,
    }
    
    // Send to metrics backend (Prometheus, InfluxDB, etc.)
    sendMetrics(metrics)
}

func checkQueueHealth(queue queue.Queue) {
    ctx := context.Background()
    
    health, err := queue.Health(ctx)
    if err != nil {
        log.Printf("Queue health check failed: %v", err)
        sendAlert("queue_health_check_failed", err.Error())
        return
    }
    
    if !health.Healthy {
        log.Printf("Queue is unhealthy: %v", health.Details)
        sendAlert("queue_unhealthy", fmt.Sprintf("Queue health issues: %v", health.Details))
    }
}
func startConsumerGroup(app *forge.App, groupConfig ConsumerGroupConfig) {
    queue := app.GetQueue()
    
    // Configure consumer group
    options := queue.ConsumeOptions{
        QueueName:         groupConfig.QueueName,
        ConsumerGroup:     groupConfig.GroupName,
        ConcurrentWorkers: groupConfig.Workers,
        PrefetchCount:     groupConfig.PrefetchCount,
        VisibilityTimeout: groupConfig.VisibilityTimeout,
        MaxRetries:        groupConfig.MaxRetries,
        RetryStrategy: queue.RetryStrategy{
            Type:            groupConfig.RetryStrategy.Type,
            InitialInterval: groupConfig.RetryStrategy.InitialInterval,
            MaxInterval:     groupConfig.RetryStrategy.MaxInterval,
            Multiplier:      groupConfig.RetryStrategy.Multiplier,
        },
    }
    
    // Start consuming with the configured handler
    err := queue.Consume(context.Background(), options, groupConfig.Handler)
    if err != nil {
        log.Fatalf("Failed to start consumer group %s: %v", groupConfig.GroupName, err)
    }
}

type ConsumerGroupConfig struct {
    QueueName         string
    GroupName         string
    Workers           int
    PrefetchCount     int
    VisibilityTimeout time.Duration
    MaxRetries        int
    RetryStrategy     RetryStrategyConfig
    Handler           queue.MessageHandler
}

type RetryStrategyConfig struct {
    Type            string
    InitialInterval time.Duration
    MaxInterval     time.Duration
    Multiplier      float64
}

func setupOrderProcessingConsumers(app *forge.App) {
    // High-priority order processor
    highPriorityConfig := ConsumerGroupConfig{
        QueueName:         "high_priority_orders",
        GroupName:         "high_priority_processors",
        Workers:           10,
        PrefetchCount:     5,
        VisibilityTimeout: time.Second * 30,
        MaxRetries:        3,
        RetryStrategy: RetryStrategyConfig{
            Type:            "exponential",
            InitialInterval: time.Second,
            MaxInterval:     time.Second * 30,
            Multiplier:      2.0,
        },
        Handler: processHighPriorityOrder,
    }
    
    go startConsumerGroup(app, highPriorityConfig)
    
    // Normal order processor
    normalConfig := ConsumerGroupConfig{
        QueueName:         "order_processing",
        GroupName:         "order_processors",
        Workers:           5,
        PrefetchCount:     10,
        VisibilityTimeout: time.Second * 45,
        MaxRetries:        3,
        RetryStrategy: RetryStrategyConfig{
            Type:            "exponential",
            InitialInterval: time.Second * 2,
            MaxInterval:     time.Minute,
            Multiplier:      2.0,
        },
        Handler: processOrderMessage,
    }
    
    go startConsumerGroup(app, normalConfig)
    
    // Bulk order processor
    bulkConfig := ConsumerGroupConfig{
        QueueName:         "bulk_orders",
        GroupName:         "bulk_processors",
        Workers:           2,
        PrefetchCount:     20,
        VisibilityTimeout: time.Minute * 5,
        MaxRetries:        2,
        RetryStrategy: RetryStrategyConfig{
            Type:            "linear",
            InitialInterval: time.Second * 5,
            MaxInterval:     time.Second * 30,
            Multiplier:      1.0,
        },
        Handler: processBulkOrderMessage,
    }
    
    go startConsumerGroup(app, bulkConfig)
}

func processHighPriorityOrder(ctx context.Context, message queue.Message) error {
    var order OrderEvent
    if err := json.Unmarshal(message.Body, &order); err != nil {
        return queue.ErrPermanentFailure
    }
    
    log.Printf("Processing high-priority order %s", order.OrderID)
    
    // Fast-track processing for high-priority orders
    if err := fastTrackOrder(ctx, order); err != nil {
        return err
    }
    
    // Immediate notification
    if err := sendUrgentNotification(ctx, order); err != nil {
        log.Printf("Failed to send urgent notification: %v", err)
        // Don't fail the order processing for notification errors
    }
    
    return nil
}

func processBulkOrderMessage(ctx context.Context, message queue.Message) error {
    var bulkOrder struct {
        BatchID string       `json:"batch_id"`
        Orders  []OrderEvent `json:"orders"`
    }
    
    if err := json.Unmarshal(message.Body, &bulkOrder); err != nil {
        return queue.ErrPermanentFailure
    }
    
    log.Printf("Processing bulk order batch %s with %d orders", 
              bulkOrder.BatchID, len(bulkOrder.Orders))
    
    // Process orders in batch
    var errors []error
    for _, order := range bulkOrder.Orders {
        if err := processOrderSync(order); err != nil {
            errors = append(errors, fmt.Errorf("order %s: %w", order.OrderID, err))
        }
    }
    
    if len(errors) > 0 {
        // If some orders failed, log and potentially retry
        log.Printf("Bulk batch %s had %d failures: %v", 
                  bulkOrder.BatchID, len(errors), errors)
        
        // Retry if less than 50% failed
        if len(errors) < len(bulkOrder.Orders)/2 {
            return fmt.Errorf("partial batch failure: %d/%d orders failed", 
                            len(errors), len(bulkOrder.Orders))
        }
        
        // Too many failures, mark as permanent failure
        return queue.ErrPermanentFailure
    }
    
    log.Printf("Successfully processed bulk batch %s", bulkOrder.BatchID)
    return nil
}

func getConsumerGroupStatsHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    groupName := c.Param("group_name")
    
    if groupName == "" {
        return c.JSON(400, map[string]string{"error": "Consumer group name is required"})
    }
    
    // Get consumer group statistics
    stats, err := queue.GetConsumerGroupStats(c.Context(), groupName)
    if err != nil {
        if errors.Is(err, queue.ErrConsumerGroupNotFound) {
            return c.JSON(404, map[string]string{"error": "Consumer group not found"})
        }
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to get consumer group stats",
            "details": err.Error(),
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "group_name":        stats.GroupName,
        "queue_name":        stats.QueueName,
        "consumer_count":    stats.ConsumerCount,
        "active_consumers":  stats.ActiveConsumers,
        "messages_consumed": stats.MessagesConsumed,
        "messages_failed":   stats.MessagesFailed,
        "average_latency":   stats.AverageLatency.Milliseconds(),
        "last_activity":     stats.LastActivity,
        "consumers":         stats.Consumers,
    })
}

func scaleConsumerGroupHandler(c forge.Context) error {
    queue := forge.GetQueue(c)
    groupName := c.Param("group_name")
    
    var req struct {
        Workers int `json:"workers"`
    }
    
    if err := c.Bind(&req); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    if req.Workers < 1 || req.Workers > 50 {
        return c.JSON(400, map[string]string{"error": "Workers must be between 1 and 50"})
    }
    
    // Scale consumer group
    err := queue.ScaleConsumerGroup(c.Context(), groupName, req.Workers)
    if err != nil {
        if errors.Is(err, queue.ErrConsumerGroupNotFound) {
            return c.JSON(404, map[string]string{"error": "Consumer group not found"})
        }
        return c.JSON(500, map[string]interface{}{
            "error": "Failed to scale consumer group",
            "details": err.Error(),
        })
    }
    
    return c.JSON(200, map[string]interface{}{
        "group_name": groupName,
        "workers":    req.Workers,
        "status":     "scaled",
    })
}

Best Practices

Message Design

  • Keep messages small: Aim for messages under 256KB
  • Include metadata: Add headers for routing and filtering
  • Use structured data: JSON or Protocol Buffers for message bodies
  • Add correlation IDs: Track message flows across services
  • Include timestamps: For debugging and monitoring

Error Handling

  • Distinguish error types: Permanent vs temporary failures
  • Implement retry strategies: Exponential backoff with jitter
  • Use dead letter queues: Handle messages that can't be processed
  • Log failures: Include context for debugging
  • Monitor error rates: Set up alerts for high failure rates

Performance Optimization

  • Batch operations: Use batch publish/consume when possible
  • Tune prefetch: Balance memory usage and throughput
  • Connection pooling: Reuse connections efficiently
  • Message compression: Compress large message bodies
  • Partition queues: Distribute load across multiple queues

Security

  • Encrypt sensitive data: Use encryption for sensitive message content
  • Validate messages: Sanitize and validate all message data
  • Use authentication: Secure queue access with proper credentials
  • Network security: Use TLS for message transport
  • Access control: Implement proper queue permissions

Troubleshooting

Common Issues

Connection Problems

# Check queue backend connectivity
curl -X GET /api/queue/health

# Verify configuration
curl -X GET /api/queue/stats

Message Processing Failures

# Check dead letter queue
curl -X GET /api/queue/dlq/stats

# View recent failures
curl -X GET /api/queue/metrics/failures

Performance Issues

# Monitor queue metrics
curl -X GET /api/queue/metrics?since=1h

# Check consumer group performance
curl -X GET /api/queue/consumer-groups/stats

Debug Mode

extensions:
  queue:
    debug: true
    log_level: "debug"
    trace_messages: true

Next Steps

How is this guide?

Last updated on