Storage Extension
Unified object storage with multiple backends, resilience features, and production-ready capabilities
Storage Extension
The Storage extension provides a unified interface for object storage operations across multiple backends including local filesystem, AWS S3, Google Cloud Storage, and Azure Blob Storage. It includes comprehensive resilience features, security measures, and observability capabilities for production environments.
Features
🗄️ Multiple Backends
- Local Filesystem: Enhanced local storage with atomic operations and file locking
- AWS S3: Full S3 support with presigned URLs and multipart uploads
- Google Cloud Storage: GCS integration (coming soon)
- Azure Blob Storage: Azure Blob support (coming soon)
- Hybrid Configuration: Mix and match backends for different use cases
🔒 Security & Validation
- Path Validation: Comprehensive path traversal protection
- Input Sanitization: Automatic key sanitization and validation
- Presigned URLs: Cryptographically signed URLs with configurable expiration
- Content Type Validation: Strict content type checking and validation
- Metadata Validation: Size and format constraints for metadata
🛡️ Resilience Features
- Circuit Breaker: Prevent cascade failures with automatic recovery
- Exponential Backoff: Configurable retry logic with intelligent backoff
- Rate Limiting: Token bucket algorithm for request rate limiting
- Timeout Management: Configurable operation timeouts
- Error Classification: Smart detection of retryable vs non-retryable errors
⚡ Performance Optimizations
- Buffer Pooling: Zero-allocation I/O operations using sync.Pool
- Concurrent Operations: Thread-safe operations with fine-grained locking
- ETag Caching: Cached MD5 checksums for efficient metadata operations
- Chunked Uploads: Large file support with configurable chunk sizes
- Connection Pooling: Optimized connection management for cloud backends
📊 Observability
- Comprehensive Metrics: Upload/download counts, durations, error rates
- Health Monitoring: Multi-backend health checks with detailed status
- Structured Logging: Contextual logging with operation tracing
- Circuit Breaker Monitoring: Real-time resilience state tracking
Installation
# Basic installation
go get -u github.com/xraph/forge/extensions/storage
# With AWS S3 support
go get -u github.com/aws/aws-sdk-go-v2/aws
go get -u github.com/aws/aws-sdk-go-v2/config
go get -u github.com/aws/aws-sdk-go-v2/service/s3
go get -u github.com/aws/aws-sdk-go-v2/feature/s3/managerFROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o app ./cmd/app
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/app .
COPY --from=builder /app/config.yaml .
CMD ["./app"]# Using Go package manager
go install github.com/xraph/forge/extensions/storage@latest
# Or add to your go.mod
echo "github.com/xraph/forge/extensions/storage v2.0.0" >> go.mod
go mod tidyConfiguration
YAML Configuration
extensions:
storage:
default: "local"
use_enhanced_backend: true
backends:
local:
type: "local"
config:
root_dir: "./storage"
base_url: "http://localhost:8080/files"
secret: "your-secret-key-here"
enable_presigned_urls: true
presign_expiry: "15m"
max_upload_size: 5368709120 # 5GB
chunk_size: 5242880 # 5MBextensions:
storage:
default: "s3"
use_enhanced_backend: true
backends:
s3:
type: "s3"
config:
region: "us-east-1"
bucket: "my-production-bucket"
prefix: "uploads"
access_key_id: "${AWS_ACCESS_KEY_ID}"
secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
# Resilience configuration
resilience:
max_retries: 5
initial_backoff: "200ms"
max_backoff: "30s"
backoff_multiplier: 2.5
circuit_breaker_enabled: true
circuit_breaker_threshold: 10
circuit_breaker_timeout: "120s"
circuit_breaker_half_open_max: 5
rate_limit_enabled: true
rate_limit_per_sec: 200
rate_limit_burst: 500
operation_timeout: "60s"
# CDN configuration
enable_cdn: true
cdn_base_url: "https://cdn.example.com"
enable_presigned_urls: true
presign_expiry: "1h"
max_upload_size: 10737418240 # 10GB
chunk_size: 10485760 # 10MBextensions:
storage:
default: "s3"
use_enhanced_backend: true
backends:
local:
type: "local"
config:
root_dir: "./temp-storage"
base_url: "http://localhost:8080/temp"
s3:
type: "s3"
config:
region: "us-east-1"
bucket: "production-bucket"
prefix: "prod"
access_key_id: "${AWS_ACCESS_KEY_ID}"
secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
s3_backup:
type: "s3"
config:
region: "us-west-2"
bucket: "backup-bucket"
prefix: "backups"
access_key_id: "${AWS_BACKUP_ACCESS_KEY_ID}"
secret_access_key: "${AWS_BACKUP_SECRET_ACCESS_KEY}"
resilience:
max_retries: 3
circuit_breaker_enabled: true
rate_limit_enabled: true
operation_timeout: "45s"Environment Variables
# AWS S3 Configuration
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key
AWS_REGION=us-east-1
AWS_S3_BUCKET=my-bucket
# Storage Configuration
STORAGE_DEFAULT_BACKEND=s3
STORAGE_MAX_UPLOAD_SIZE=5368709120
STORAGE_CHUNK_SIZE=5242880
STORAGE_PRESIGN_EXPIRY=15m
# Resilience Configuration
STORAGE_MAX_RETRIES=3
STORAGE_CIRCUIT_BREAKER_THRESHOLD=5
STORAGE_RATE_LIMIT_PER_SEC=100Programmatic Configuration
package main
import (
"time"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/storage"
)
func main() {
app := forge.New()
// Create storage configuration
config := storage.Config{
Default: "s3",
UseEnhancedBackend: true,
Backends: map[string]storage.BackendConfig{
"s3": {
Type: "s3",
Config: map[string]interface{}{
"region": "us-east-1",
"bucket": "my-bucket",
"prefix": "uploads",
},
},
"local": {
Type: "local",
Config: map[string]interface{}{
"root_dir": "./storage",
"base_url": "http://localhost:8080/files",
},
},
},
EnablePresignedURLs: true,
PresignExpiry: 30 * time.Minute,
MaxUploadSize: 10 * 1024 * 1024 * 1024, // 10GB
ChunkSize: 10 * 1024 * 1024, // 10MB
Resilience: storage.ResilienceConfig{
MaxRetries: 5,
CircuitBreakerEnabled: true,
CircuitBreakerThreshold: 10,
RateLimitEnabled: true,
RateLimitPerSec: 200,
OperationTimeout: 60 * time.Second,
},
}
// Register extension
app.Use(storage.NewExtension(config))
app.Run()
}Usage Examples
Basic Operations
package main
import (
"bytes"
"context"
"io"
"log"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/storage"
)
func basicOperations(app forge.App) {
// Get storage manager
storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")
ctx := context.Background()
// Upload a file
content := []byte("Hello, World!")
err := storageManager.Upload(ctx, "documents/hello.txt", bytes.NewReader(content))
if err != nil {
log.Fatalf("Upload failed: %v", err)
}
log.Println("File uploaded successfully")
// Upload with options
metadata := map[string]string{
"author": "John Doe",
"category": "documents",
"created_at": "2024-01-15",
}
err = storageManager.Upload(ctx, "documents/report.pdf", bytes.NewReader(content),
storage.WithContentType("application/pdf"),
storage.WithMetadata(metadata),
storage.WithACL("public-read"),
)
if err != nil {
log.Fatalf("Upload with options failed: %v", err)
}
// Download a file
reader, err := storageManager.Download(ctx, "documents/hello.txt")
if err != nil {
log.Fatalf("Download failed: %v", err)
}
defer reader.Close()
// Read content
downloadedContent, err := io.ReadAll(reader)
if err != nil {
log.Fatalf("Read failed: %v", err)
}
log.Printf("Downloaded content: %s", string(downloadedContent))
// Check if file exists
exists, err := storageManager.Exists(ctx, "documents/hello.txt")
if err != nil {
log.Fatalf("Exists check failed: %v", err)
}
log.Printf("File exists: %v", exists)
}func fileManagement(app forge.App) {
storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")
ctx := context.Background()
// List files with prefix
objects, err := storageManager.List(ctx, "documents/",
storage.WithLimit(50),
storage.WithRecursive(true),
)
if err != nil {
log.Fatalf("List failed: %v", err)
}
log.Printf("Found %d objects:", len(objects))
for _, obj := range objects {
log.Printf("- %s (size: %d, modified: %v)",
obj.Key, obj.Size, obj.LastModified)
}
// Copy a file
err = storageManager.Copy(ctx, "documents/hello.txt", "backup/hello_backup.txt")
if err != nil {
log.Fatalf("Copy failed: %v", err)
}
log.Println("File copied successfully")
// Move a file
err = storageManager.Move(ctx, "documents/temp.txt", "archive/temp.txt")
if err != nil {
log.Fatalf("Move failed: %v", err)
}
log.Println("File moved successfully")
// Delete a file
err = storageManager.Delete(ctx, "documents/old_file.txt")
if err != nil {
log.Fatalf("Delete failed: %v", err)
}
log.Println("File deleted successfully")
// Batch operations
filesToDelete := []string{
"temp/file1.txt",
"temp/file2.txt",
"temp/file3.txt",
}
for _, key := range filesToDelete {
if err := storageManager.Delete(ctx, key); err != nil {
log.Printf("Failed to delete %s: %v", key, err)
} else {
log.Printf("Deleted %s", key)
}
}
}func metadataOperations(app forge.App) {
storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")
ctx := context.Background()
// Get file metadata
metadata, err := storageManager.Metadata(ctx, "documents/report.pdf")
if err != nil {
log.Fatalf("Metadata retrieval failed: %v", err)
}
log.Printf("File metadata:")
log.Printf("- Key: %s", metadata.Key)
log.Printf("- Size: %d bytes", metadata.Size)
log.Printf("- Content Type: %s", metadata.ContentType)
log.Printf("- Last Modified: %v", metadata.LastModified)
log.Printf("- ETag: %s", metadata.ETag)
log.Printf("Custom metadata:")
for key, value := range metadata.Metadata {
log.Printf("- %s: %s", key, value)
}
// Search files by metadata
objects, err := storageManager.List(ctx, "documents/")
if err != nil {
log.Fatalf("List failed: %v", err)
}
// Filter by custom metadata
var reportFiles []storage.Object
for _, obj := range objects {
if category, exists := obj.Metadata["category"]; exists && category == "reports" {
reportFiles = append(reportFiles, obj)
}
}
log.Printf("Found %d report files", len(reportFiles))
for _, file := range reportFiles {
log.Printf("- %s (author: %s)", file.Key, file.Metadata["author"])
}
}Advanced Features
func presignedURLs(app forge.App) {
storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")
ctx := context.Background()
// Generate presigned upload URL
uploadURL, err := storageManager.PresignUpload(ctx, "uploads/user-file.jpg", 15*time.Minute)
if err != nil {
log.Fatalf("Presigned upload URL generation failed: %v", err)
}
log.Printf("Upload URL: %s", uploadURL)
// Generate presigned download URL
downloadURL, err := storageManager.PresignDownload(ctx, "documents/report.pdf", 1*time.Hour)
if err != nil {
log.Fatalf("Presigned download URL generation failed: %v", err)
}
log.Printf("Download URL: %s", downloadURL)
// Use presigned URLs in HTTP handlers
http.HandleFunc("/api/upload-url", func(w http.ResponseWriter, r *http.Request) {
filename := r.URL.Query().Get("filename")
if filename == "" {
http.Error(w, "filename required", http.StatusBadRequest)
return
}
key := fmt.Sprintf("uploads/%s/%s", getUserID(r), filename)
url, err := storageManager.PresignUpload(ctx, key, 15*time.Minute)
if err != nil {
http.Error(w, "Failed to generate upload URL", http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]string{
"upload_url": url,
"key": key,
})
})
http.HandleFunc("/api/download-url", func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
if key == "" {
http.Error(w, "key required", http.StatusBadRequest)
return
}
// Verify user has access to this file
if !userHasAccess(getUserID(r), key) {
http.Error(w, "Access denied", http.StatusForbidden)
return
}
url, err := storageManager.PresignDownload(ctx, key, 1*time.Hour)
if err != nil {
http.Error(w, "Failed to generate download URL", http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]string{
"download_url": url,
})
})
}
func getUserID(r *http.Request) string {
// Extract user ID from request (JWT, session, etc.)
return "user123"
}
func userHasAccess(userID, key string) bool {
// Implement access control logic
return strings.HasPrefix(key, "uploads/"+userID+"/")
}func multiBackendUsage(app forge.App) {
storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")
ctx := context.Background()
// Use specific backend for different operations
content := []byte("Important document")
// Store in primary backend (S3)
err := storageManager.UploadToBackend(ctx, "s3", "documents/important.pdf", bytes.NewReader(content))
if err != nil {
log.Fatalf("Primary upload failed: %v", err)
}
// Create backup in secondary backend
err = storageManager.UploadToBackend(ctx, "s3_backup", "documents/important.pdf", bytes.NewReader(content))
if err != nil {
log.Printf("Backup upload failed: %v", err)
// Continue with primary storage success
}
// Store temporary files locally
tempContent := []byte("Temporary processing data")
err = storageManager.UploadToBackend(ctx, "local", "temp/processing.tmp", bytes.NewReader(tempContent))
if err != nil {
log.Fatalf("Temp upload failed: %v", err)
}
// Implement tiered storage strategy
implementTieredStorage(storageManager)
}
func implementTieredStorage(manager *storage.StorageManager) {
ctx := context.Background()
// Hot tier: Frequently accessed files (S3 Standard)
hotTierFiles := []string{"documents/recent.pdf", "images/profile.jpg"}
// Cold tier: Archived files (S3 Glacier via backup backend)
coldTierFiles := []string{"archive/old-report.pdf", "backup/legacy-data.zip"}
// Move files to appropriate tiers
for _, file := range hotTierFiles {
// Ensure file is in primary S3 backend
exists, _ := manager.ExistsInBackend(ctx, "s3", file)
if !exists {
// Move from backup to primary if needed
reader, err := manager.DownloadFromBackend(ctx, "s3_backup", file)
if err == nil {
manager.UploadToBackend(ctx, "s3", file, reader)
reader.Close()
}
}
}
for _, file := range coldTierFiles {
// Ensure file is in backup backend (cold storage)
exists, _ := manager.ExistsInBackend(ctx, "s3_backup", file)
if !exists {
reader, err := manager.DownloadFromBackend(ctx, "s3", file)
if err == nil {
manager.UploadToBackend(ctx, "s3_backup", file, reader)
reader.Close()
// Optionally delete from primary after successful backup
manager.DeleteFromBackend(ctx, "s3", file)
}
}
}
}func healthMonitoring(app forge.App) {
storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")
ctx := context.Background()
// Check overall storage health
err := storageManager.Health(ctx)
if err != nil {
log.Printf("Storage health check failed: %v", err)
} else {
log.Println("Storage is healthy")
}
// Check individual backend health
backends := []string{"local", "s3", "s3_backup"}
for _, backend := range backends {
err := storageManager.HealthCheckBackend(ctx, backend)
if err != nil {
log.Printf("Backend %s is unhealthy: %v", backend, err)
} else {
log.Printf("Backend %s is healthy", backend)
}
}
// Get detailed health status
healthStatus := storageManager.GetHealthStatus(ctx)
log.Printf("Health Status:")
log.Printf("- Overall: %s", healthStatus.Overall)
log.Printf("- Backends:")
for name, status := range healthStatus.Backends {
log.Printf(" - %s: %s (last check: %v)", name, status.Status, status.LastCheck)
if status.Error != "" {
log.Printf(" Error: %s", status.Error)
}
}
// Monitor circuit breaker state
if resilientStorage, ok := storageManager.GetBackend("s3").(*storage.ResilientStorage); ok {
state := resilientStorage.GetCircuitBreakerState()
log.Printf("Circuit breaker state: %s", state)
if state == storage.CircuitOpen {
log.Println("Circuit breaker is open - storage operations will fail fast")
// Implement fallback logic or alerting
}
}
// Set up health monitoring endpoint
http.HandleFunc("/health/storage", func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
err := storageManager.Health(ctx)
if err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "unhealthy",
"error": err.Error(),
})
return
}
healthStatus := storageManager.GetHealthStatus(ctx)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(healthStatus)
})
// Set up metrics endpoint
http.HandleFunc("/metrics/storage", func(w http.ResponseWriter, r *http.Request) {
metrics := storageManager.GetMetrics()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(metrics)
})
}Best Practices
Security
- Validate Input: Always validate file keys and metadata before operations
- Use Presigned URLs: For client-side uploads/downloads to avoid exposing credentials
- Implement Access Control: Check user permissions before generating presigned URLs
- Sanitize File Names: Use the built-in path validation to prevent directory traversal
- Set Appropriate ACLs: Configure proper access control lists for your use case
Performance
- Use Appropriate Chunk Sizes: Configure chunk sizes based on your typical file sizes
- Enable Buffer Pooling: Use the enhanced backend for better memory management
- Implement Caching: Cache frequently accessed metadata and file existence checks
- Use Concurrent Operations: Leverage Go's concurrency for batch operations
- Monitor Circuit Breaker: Implement fallback strategies when circuit breaker opens
Reliability
- Configure Retries: Set appropriate retry counts and backoff strategies
- Use Health Checks: Implement regular health monitoring for early issue detection
- Plan for Failures: Design your application to handle storage backend failures gracefully
- Backup Strategy: Use multiple backends for critical data redundancy
- Monitor Metrics: Track upload/download success rates and response times
Cost Optimization
- Implement Tiered Storage: Move infrequently accessed data to cheaper storage classes
- Use Lifecycle Policies: Configure automatic data lifecycle management
- Monitor Usage: Track storage usage and costs across different backends
- Optimize Transfer: Use appropriate chunk sizes to minimize transfer costs
- Clean Up: Implement regular cleanup of temporary and expired files
Troubleshooting
Common Issues
Upload Failures
- Check file size against
max_upload_sizeconfiguration - Verify backend credentials and permissions
- Ensure network connectivity to cloud storage services
- Check circuit breaker state - it may be open due to previous failures
Performance Issues
- Monitor circuit breaker state and failure rates
- Check if rate limiting is too restrictive
- Verify chunk size configuration for your use case
- Review retry configuration - too many retries can slow operations
Configuration Problems
- Validate YAML syntax and required fields
- Check environment variable substitution
- Verify backend-specific configuration parameters
- Ensure proper file permissions for local storage
Debug Mode
// Enable debug logging
config := storage.DefaultConfig()
config.Debug = true
// Or set environment variable
os.Setenv("STORAGE_DEBUG", "true")Health Check Endpoints
# Check overall storage health
curl http://localhost:8080/health/storage
# Get detailed metrics
curl http://localhost:8080/metrics/storage
# Check specific backend
curl http://localhost:8080/health/storage/s3Next Steps
- Explore Queue Extension for asynchronous file processing
- Check Cache Extension for metadata caching strategies
- Review Database Extension for storing file metadata
- See Auth Extension for implementing access control
- Visit AI Extension for intelligent file processing and analysis
How is this guide?
Last updated on