Storage Extension

Unified object storage with multiple backends, resilience features, and production-ready capabilities

Storage Extension

The Storage extension provides a unified interface for object storage operations across multiple backends including local filesystem, AWS S3, Google Cloud Storage, and Azure Blob Storage. It includes comprehensive resilience features, security measures, and observability capabilities for production environments.

Features

🗄️ Multiple Backends

  • Local Filesystem: Enhanced local storage with atomic operations and file locking
  • AWS S3: Full S3 support with presigned URLs and multipart uploads
  • Google Cloud Storage: GCS integration (coming soon)
  • Azure Blob Storage: Azure Blob support (coming soon)
  • Hybrid Configuration: Mix and match backends for different use cases

🔒 Security & Validation

  • Path Validation: Comprehensive path traversal protection
  • Input Sanitization: Automatic key sanitization and validation
  • Presigned URLs: Cryptographically signed URLs with configurable expiration
  • Content Type Validation: Strict content type checking and validation
  • Metadata Validation: Size and format constraints for metadata

🛡️ Resilience Features

  • Circuit Breaker: Prevent cascade failures with automatic recovery
  • Exponential Backoff: Configurable retry logic with intelligent backoff
  • Rate Limiting: Token bucket algorithm for request rate limiting
  • Timeout Management: Configurable operation timeouts
  • Error Classification: Smart detection of retryable vs non-retryable errors

⚡ Performance Optimizations

  • Buffer Pooling: Zero-allocation I/O operations using sync.Pool
  • Concurrent Operations: Thread-safe operations with fine-grained locking
  • ETag Caching: Cached MD5 checksums for efficient metadata operations
  • Chunked Uploads: Large file support with configurable chunk sizes
  • Connection Pooling: Optimized connection management for cloud backends

📊 Observability

  • Comprehensive Metrics: Upload/download counts, durations, error rates
  • Health Monitoring: Multi-backend health checks with detailed status
  • Structured Logging: Contextual logging with operation tracing
  • Circuit Breaker Monitoring: Real-time resilience state tracking

Installation

# Basic installation
go get -u github.com/xraph/forge/extensions/storage

# With AWS S3 support
go get -u github.com/aws/aws-sdk-go-v2/aws
go get -u github.com/aws/aws-sdk-go-v2/config
go get -u github.com/aws/aws-sdk-go-v2/service/s3
go get -u github.com/aws/aws-sdk-go-v2/feature/s3/manager
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o app ./cmd/app

FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/app .
COPY --from=builder /app/config.yaml .
CMD ["./app"]
# Using Go package manager
go install github.com/xraph/forge/extensions/storage@latest

# Or add to your go.mod
echo "github.com/xraph/forge/extensions/storage v2.0.0" >> go.mod
go mod tidy

Configuration

YAML Configuration

extensions:
  storage:
    default: "local"
    use_enhanced_backend: true
    
    backends:
      local:
        type: "local"
        config:
          root_dir: "./storage"
          base_url: "http://localhost:8080/files"
          secret: "your-secret-key-here"
    
    enable_presigned_urls: true
    presign_expiry: "15m"
    max_upload_size: 5368709120  # 5GB
    chunk_size: 5242880          # 5MB
extensions:
  storage:
    default: "s3"
    use_enhanced_backend: true
    
    backends:
      s3:
        type: "s3"
        config:
          region: "us-east-1"
          bucket: "my-production-bucket"
          prefix: "uploads"
          access_key_id: "${AWS_ACCESS_KEY_ID}"
          secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
    
    # Resilience configuration
    resilience:
      max_retries: 5
      initial_backoff: "200ms"
      max_backoff: "30s"
      backoff_multiplier: 2.5
      
      circuit_breaker_enabled: true
      circuit_breaker_threshold: 10
      circuit_breaker_timeout: "120s"
      circuit_breaker_half_open_max: 5
      
      rate_limit_enabled: true
      rate_limit_per_sec: 200
      rate_limit_burst: 500
      
      operation_timeout: "60s"
    
    # CDN configuration
    enable_cdn: true
    cdn_base_url: "https://cdn.example.com"
    
    enable_presigned_urls: true
    presign_expiry: "1h"
    max_upload_size: 10737418240  # 10GB
    chunk_size: 10485760          # 10MB
extensions:
  storage:
    default: "s3"
    use_enhanced_backend: true
    
    backends:
      local:
        type: "local"
        config:
          root_dir: "./temp-storage"
          base_url: "http://localhost:8080/temp"
      
      s3:
        type: "s3"
        config:
          region: "us-east-1"
          bucket: "production-bucket"
          prefix: "prod"
          access_key_id: "${AWS_ACCESS_KEY_ID}"
          secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
      
      s3_backup:
        type: "s3"
        config:
          region: "us-west-2"
          bucket: "backup-bucket"
          prefix: "backups"
          access_key_id: "${AWS_BACKUP_ACCESS_KEY_ID}"
          secret_access_key: "${AWS_BACKUP_SECRET_ACCESS_KEY}"
    
    resilience:
      max_retries: 3
      circuit_breaker_enabled: true
      rate_limit_enabled: true
      operation_timeout: "45s"

Environment Variables

# AWS S3 Configuration
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key
AWS_REGION=us-east-1
AWS_S3_BUCKET=my-bucket

# Storage Configuration
STORAGE_DEFAULT_BACKEND=s3
STORAGE_MAX_UPLOAD_SIZE=5368709120
STORAGE_CHUNK_SIZE=5242880
STORAGE_PRESIGN_EXPIRY=15m

# Resilience Configuration
STORAGE_MAX_RETRIES=3
STORAGE_CIRCUIT_BREAKER_THRESHOLD=5
STORAGE_RATE_LIMIT_PER_SEC=100

Programmatic Configuration

package main

import (
    "time"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/storage"
)

func main() {
    app := forge.New()

    // Create storage configuration
    config := storage.Config{
        Default: "s3",
        UseEnhancedBackend: true,
        Backends: map[string]storage.BackendConfig{
            "s3": {
                Type: "s3",
                Config: map[string]interface{}{
                    "region": "us-east-1",
                    "bucket": "my-bucket",
                    "prefix": "uploads",
                },
            },
            "local": {
                Type: "local",
                Config: map[string]interface{}{
                    "root_dir": "./storage",
                    "base_url": "http://localhost:8080/files",
                },
            },
        },
        EnablePresignedURLs: true,
        PresignExpiry:       30 * time.Minute,
        MaxUploadSize:       10 * 1024 * 1024 * 1024, // 10GB
        ChunkSize:           10 * 1024 * 1024,         // 10MB
        Resilience: storage.ResilienceConfig{
            MaxRetries:              5,
            CircuitBreakerEnabled:   true,
            CircuitBreakerThreshold: 10,
            RateLimitEnabled:        true,
            RateLimitPerSec:         200,
            OperationTimeout:        60 * time.Second,
        },
    }

    // Register extension
    app.Use(storage.NewExtension(config))

    app.Run()
}

Usage Examples

Basic Operations

package main

import (
    "bytes"
    "context"
    "io"
    "log"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/storage"
)

func basicOperations(app forge.App) {
    // Get storage manager
    storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")
    ctx := context.Background()

    // Upload a file
    content := []byte("Hello, World!")
    err := storageManager.Upload(ctx, "documents/hello.txt", bytes.NewReader(content))
    if err != nil {
        log.Fatalf("Upload failed: %v", err)
    }
    log.Println("File uploaded successfully")

    // Upload with options
    metadata := map[string]string{
        "author":      "John Doe",
        "category":    "documents",
        "created_at":  "2024-01-15",
    }
    
    err = storageManager.Upload(ctx, "documents/report.pdf", bytes.NewReader(content),
        storage.WithContentType("application/pdf"),
        storage.WithMetadata(metadata),
        storage.WithACL("public-read"),
    )
    if err != nil {
        log.Fatalf("Upload with options failed: %v", err)
    }

    // Download a file
    reader, err := storageManager.Download(ctx, "documents/hello.txt")
    if err != nil {
        log.Fatalf("Download failed: %v", err)
    }
    defer reader.Close()

    // Read content
    downloadedContent, err := io.ReadAll(reader)
    if err != nil {
        log.Fatalf("Read failed: %v", err)
    }
    log.Printf("Downloaded content: %s", string(downloadedContent))

    // Check if file exists
    exists, err := storageManager.Exists(ctx, "documents/hello.txt")
    if err != nil {
        log.Fatalf("Exists check failed: %v", err)
    }
    log.Printf("File exists: %v", exists)
}
func fileManagement(app forge.App) {
    storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")
    ctx := context.Background()

    // List files with prefix
    objects, err := storageManager.List(ctx, "documents/",
        storage.WithLimit(50),
        storage.WithRecursive(true),
    )
    if err != nil {
        log.Fatalf("List failed: %v", err)
    }

    log.Printf("Found %d objects:", len(objects))
    for _, obj := range objects {
        log.Printf("- %s (size: %d, modified: %v)", 
            obj.Key, obj.Size, obj.LastModified)
    }

    // Copy a file
    err = storageManager.Copy(ctx, "documents/hello.txt", "backup/hello_backup.txt")
    if err != nil {
        log.Fatalf("Copy failed: %v", err)
    }
    log.Println("File copied successfully")

    // Move a file
    err = storageManager.Move(ctx, "documents/temp.txt", "archive/temp.txt")
    if err != nil {
        log.Fatalf("Move failed: %v", err)
    }
    log.Println("File moved successfully")

    // Delete a file
    err = storageManager.Delete(ctx, "documents/old_file.txt")
    if err != nil {
        log.Fatalf("Delete failed: %v", err)
    }
    log.Println("File deleted successfully")

    // Batch operations
    filesToDelete := []string{
        "temp/file1.txt",
        "temp/file2.txt",
        "temp/file3.txt",
    }

    for _, key := range filesToDelete {
        if err := storageManager.Delete(ctx, key); err != nil {
            log.Printf("Failed to delete %s: %v", key, err)
        } else {
            log.Printf("Deleted %s", key)
        }
    }
}
func metadataOperations(app forge.App) {
    storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")
    ctx := context.Background()

    // Get file metadata
    metadata, err := storageManager.Metadata(ctx, "documents/report.pdf")
    if err != nil {
        log.Fatalf("Metadata retrieval failed: %v", err)
    }

    log.Printf("File metadata:")
    log.Printf("- Key: %s", metadata.Key)
    log.Printf("- Size: %d bytes", metadata.Size)
    log.Printf("- Content Type: %s", metadata.ContentType)
    log.Printf("- Last Modified: %v", metadata.LastModified)
    log.Printf("- ETag: %s", metadata.ETag)
    
    log.Printf("Custom metadata:")
    for key, value := range metadata.Metadata {
        log.Printf("- %s: %s", key, value)
    }

    // Search files by metadata
    objects, err := storageManager.List(ctx, "documents/")
    if err != nil {
        log.Fatalf("List failed: %v", err)
    }

    // Filter by custom metadata
    var reportFiles []storage.Object
    for _, obj := range objects {
        if category, exists := obj.Metadata["category"]; exists && category == "reports" {
            reportFiles = append(reportFiles, obj)
        }
    }

    log.Printf("Found %d report files", len(reportFiles))
    for _, file := range reportFiles {
        log.Printf("- %s (author: %s)", file.Key, file.Metadata["author"])
    }
}

Advanced Features

func presignedURLs(app forge.App) {
    storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")
    ctx := context.Background()

    // Generate presigned upload URL
    uploadURL, err := storageManager.PresignUpload(ctx, "uploads/user-file.jpg", 15*time.Minute)
    if err != nil {
        log.Fatalf("Presigned upload URL generation failed: %v", err)
    }
    log.Printf("Upload URL: %s", uploadURL)

    // Generate presigned download URL
    downloadURL, err := storageManager.PresignDownload(ctx, "documents/report.pdf", 1*time.Hour)
    if err != nil {
        log.Fatalf("Presigned download URL generation failed: %v", err)
    }
    log.Printf("Download URL: %s", downloadURL)

    // Use presigned URLs in HTTP handlers
    http.HandleFunc("/api/upload-url", func(w http.ResponseWriter, r *http.Request) {
        filename := r.URL.Query().Get("filename")
        if filename == "" {
            http.Error(w, "filename required", http.StatusBadRequest)
            return
        }

        key := fmt.Sprintf("uploads/%s/%s", getUserID(r), filename)
        url, err := storageManager.PresignUpload(ctx, key, 15*time.Minute)
        if err != nil {
            http.Error(w, "Failed to generate upload URL", http.StatusInternalServerError)
            return
        }

        json.NewEncoder(w).Encode(map[string]string{
            "upload_url": url,
            "key":        key,
        })
    })

    http.HandleFunc("/api/download-url", func(w http.ResponseWriter, r *http.Request) {
        key := r.URL.Query().Get("key")
        if key == "" {
            http.Error(w, "key required", http.StatusBadRequest)
            return
        }

        // Verify user has access to this file
        if !userHasAccess(getUserID(r), key) {
            http.Error(w, "Access denied", http.StatusForbidden)
            return
        }

        url, err := storageManager.PresignDownload(ctx, key, 1*time.Hour)
        if err != nil {
            http.Error(w, "Failed to generate download URL", http.StatusInternalServerError)
            return
        }

        json.NewEncoder(w).Encode(map[string]string{
            "download_url": url,
        })
    })
}

func getUserID(r *http.Request) string {
    // Extract user ID from request (JWT, session, etc.)
    return "user123"
}

func userHasAccess(userID, key string) bool {
    // Implement access control logic
    return strings.HasPrefix(key, "uploads/"+userID+"/")
}
func multiBackendUsage(app forge.App) {
    storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")
    ctx := context.Background()

    // Use specific backend for different operations
    content := []byte("Important document")

    // Store in primary backend (S3)
    err := storageManager.UploadToBackend(ctx, "s3", "documents/important.pdf", bytes.NewReader(content))
    if err != nil {
        log.Fatalf("Primary upload failed: %v", err)
    }

    // Create backup in secondary backend
    err = storageManager.UploadToBackend(ctx, "s3_backup", "documents/important.pdf", bytes.NewReader(content))
    if err != nil {
        log.Printf("Backup upload failed: %v", err)
        // Continue with primary storage success
    }

    // Store temporary files locally
    tempContent := []byte("Temporary processing data")
    err = storageManager.UploadToBackend(ctx, "local", "temp/processing.tmp", bytes.NewReader(tempContent))
    if err != nil {
        log.Fatalf("Temp upload failed: %v", err)
    }

    // Implement tiered storage strategy
    implementTieredStorage(storageManager)
}

func implementTieredStorage(manager *storage.StorageManager) {
    ctx := context.Background()

    // Hot tier: Frequently accessed files (S3 Standard)
    hotTierFiles := []string{"documents/recent.pdf", "images/profile.jpg"}
    
    // Cold tier: Archived files (S3 Glacier via backup backend)
    coldTierFiles := []string{"archive/old-report.pdf", "backup/legacy-data.zip"}

    // Move files to appropriate tiers
    for _, file := range hotTierFiles {
        // Ensure file is in primary S3 backend
        exists, _ := manager.ExistsInBackend(ctx, "s3", file)
        if !exists {
            // Move from backup to primary if needed
            reader, err := manager.DownloadFromBackend(ctx, "s3_backup", file)
            if err == nil {
                manager.UploadToBackend(ctx, "s3", file, reader)
                reader.Close()
            }
        }
    }

    for _, file := range coldTierFiles {
        // Ensure file is in backup backend (cold storage)
        exists, _ := manager.ExistsInBackend(ctx, "s3_backup", file)
        if !exists {
            reader, err := manager.DownloadFromBackend(ctx, "s3", file)
            if err == nil {
                manager.UploadToBackend(ctx, "s3_backup", file, reader)
                reader.Close()
                // Optionally delete from primary after successful backup
                manager.DeleteFromBackend(ctx, "s3", file)
            }
        }
    }
}
func healthMonitoring(app forge.App) {
    storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")
    ctx := context.Background()

    // Check overall storage health
    err := storageManager.Health(ctx)
    if err != nil {
        log.Printf("Storage health check failed: %v", err)
    } else {
        log.Println("Storage is healthy")
    }

    // Check individual backend health
    backends := []string{"local", "s3", "s3_backup"}
    for _, backend := range backends {
        err := storageManager.HealthCheckBackend(ctx, backend)
        if err != nil {
            log.Printf("Backend %s is unhealthy: %v", backend, err)
        } else {
            log.Printf("Backend %s is healthy", backend)
        }
    }

    // Get detailed health status
    healthStatus := storageManager.GetHealthStatus(ctx)
    log.Printf("Health Status:")
    log.Printf("- Overall: %s", healthStatus.Overall)
    log.Printf("- Backends:")
    for name, status := range healthStatus.Backends {
        log.Printf("  - %s: %s (last check: %v)", name, status.Status, status.LastCheck)
        if status.Error != "" {
            log.Printf("    Error: %s", status.Error)
        }
    }

    // Monitor circuit breaker state
    if resilientStorage, ok := storageManager.GetBackend("s3").(*storage.ResilientStorage); ok {
        state := resilientStorage.GetCircuitBreakerState()
        log.Printf("Circuit breaker state: %s", state)
        
        if state == storage.CircuitOpen {
            log.Println("Circuit breaker is open - storage operations will fail fast")
            // Implement fallback logic or alerting
        }
    }

    // Set up health monitoring endpoint
    http.HandleFunc("/health/storage", func(w http.ResponseWriter, r *http.Request) {
        ctx := r.Context()
        
        err := storageManager.Health(ctx)
        if err != nil {
            w.WriteHeader(http.StatusServiceUnavailable)
            json.NewEncoder(w).Encode(map[string]interface{}{
                "status": "unhealthy",
                "error":  err.Error(),
            })
            return
        }

        healthStatus := storageManager.GetHealthStatus(ctx)
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(healthStatus)
    })

    // Set up metrics endpoint
    http.HandleFunc("/metrics/storage", func(w http.ResponseWriter, r *http.Request) {
        metrics := storageManager.GetMetrics()
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(metrics)
    })
}

Best Practices

Security

  • Validate Input: Always validate file keys and metadata before operations
  • Use Presigned URLs: For client-side uploads/downloads to avoid exposing credentials
  • Implement Access Control: Check user permissions before generating presigned URLs
  • Sanitize File Names: Use the built-in path validation to prevent directory traversal
  • Set Appropriate ACLs: Configure proper access control lists for your use case

Performance

  • Use Appropriate Chunk Sizes: Configure chunk sizes based on your typical file sizes
  • Enable Buffer Pooling: Use the enhanced backend for better memory management
  • Implement Caching: Cache frequently accessed metadata and file existence checks
  • Use Concurrent Operations: Leverage Go's concurrency for batch operations
  • Monitor Circuit Breaker: Implement fallback strategies when circuit breaker opens

Reliability

  • Configure Retries: Set appropriate retry counts and backoff strategies
  • Use Health Checks: Implement regular health monitoring for early issue detection
  • Plan for Failures: Design your application to handle storage backend failures gracefully
  • Backup Strategy: Use multiple backends for critical data redundancy
  • Monitor Metrics: Track upload/download success rates and response times

Cost Optimization

  • Implement Tiered Storage: Move infrequently accessed data to cheaper storage classes
  • Use Lifecycle Policies: Configure automatic data lifecycle management
  • Monitor Usage: Track storage usage and costs across different backends
  • Optimize Transfer: Use appropriate chunk sizes to minimize transfer costs
  • Clean Up: Implement regular cleanup of temporary and expired files

Troubleshooting

Common Issues

Upload Failures

  • Check file size against max_upload_size configuration
  • Verify backend credentials and permissions
  • Ensure network connectivity to cloud storage services
  • Check circuit breaker state - it may be open due to previous failures

Performance Issues

  • Monitor circuit breaker state and failure rates
  • Check if rate limiting is too restrictive
  • Verify chunk size configuration for your use case
  • Review retry configuration - too many retries can slow operations

Configuration Problems

  • Validate YAML syntax and required fields
  • Check environment variable substitution
  • Verify backend-specific configuration parameters
  • Ensure proper file permissions for local storage

Debug Mode

// Enable debug logging
config := storage.DefaultConfig()
config.Debug = true

// Or set environment variable
os.Setenv("STORAGE_DEBUG", "true")

Health Check Endpoints

# Check overall storage health
curl http://localhost:8080/health/storage

# Get detailed metrics
curl http://localhost:8080/metrics/storage

# Check specific backend
curl http://localhost:8080/health/storage/s3

Next Steps

How is this guide?

Last updated on