Database Extension

Multi-database support with SQL (Postgres, MySQL, SQLite) and NoSQL (MongoDB)

Database Extension

The Database extension provides a unified interface for working with multiple database backends, including SQL databases (PostgreSQL, MySQL, SQLite) and NoSQL databases (MongoDB, Redis). It offers connection pooling, health monitoring, migration support, and production-ready features.

The Database extension abstracts database operations while preserving access to native drivers (Bun ORM for SQL, MongoDB driver for NoSQL) for advanced use cases.

Features

Supported Database Types

  • PostgreSQL: Advanced SQL database with JSON support
  • MySQL: Popular relational database
  • SQLite: Embedded SQL database
  • MongoDB: Document-oriented NoSQL database

Core Capabilities

  • Unified Interface: Consistent API across all database types
  • Connection Pooling: Efficient connection management with configurable pool settings
  • Health Monitoring: Real-time database health checks with latency tracking
  • Native Driver Access: Direct access to Bun ORM (SQL) and MongoDB client
  • Transaction Management: ACID compliance for SQL databases, sessions for MongoDB
  • Observability: Query metrics, performance tracking, and structured logging

Production Features

  • Connection Retry: Automatic reconnection with configurable retry logic
  • Pool Statistics: Detailed connection pool metrics
  • Health Checks: Continuous health monitoring with status reporting
  • Multiple Databases: Support for multiple database connections in a single application
  • DI Integration: Seamless integration with Forge's dependency injection system

Installation

go get github.com/xraph/forge/extensions/database

Configuration

extensions:
  database:
    # Default database name (optional, uses first database if not specified)
    default: "main"
    
    # Database configurations
    databases:
      - name: "main"
        type: "postgres"
        dsn: "postgres://user:password@localhost:5432/mydb?sslmode=disable"
        max_open_conns: 25
        max_idle_conns: 5
        conn_max_lifetime: "5m"
        conn_max_idle_time: "5m"
        max_retries: 3
        retry_delay: "1s"
        health_check_interval: "30s"
        
      - name: "cache"
        type: "sqlite"
        dsn: "file::memory:?cache=shared"
        max_open_conns: 1
        max_idle_conns: 1
        
      - name: "documents"
        type: "mongodb"
        dsn: "mongodb://localhost:27017/myapp"
        max_open_conns: 100
        max_idle_conns: 5
        conn_max_idle_time: "30m"
# Default database
export DB_DEFAULT="main"

# PostgreSQL Configuration
export DB_POSTGRES_DSN="postgres://user:password@localhost:5432/mydb"
export DB_POSTGRES_MAX_OPEN_CONNS="25"
export DB_POSTGRES_MAX_IDLE_CONNS="5"

# MySQL Configuration  
export DB_MYSQL_DSN="user:password@tcp(localhost:3306)/mydb?parseTime=true"
export DB_MYSQL_MAX_OPEN_CONNS="20"

# SQLite Configuration
export DB_SQLITE_DSN="./data/app.db"

# MongoDB Configuration
export DB_MONGODB_DSN="mongodb://localhost:27017/myapp"
export DB_MONGODB_MAX_OPEN_CONNS="100"
package main

import (
    "time"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/database"
)

func main() {
    app := forge.New()

    // Configure Database extension
    config := database.Config{
        Default: "main",
        Databases: []database.DatabaseConfig{
            {
                Name:                "main",
                Type:                database.TypePostgres,
                DSN:                 "postgres://user:password@localhost:5432/mydb",
                MaxOpenConns:        25,
                MaxIdleConns:        5,
                ConnMaxLifetime:     5 * time.Minute,
                ConnMaxIdleTime:     5 * time.Minute,
                MaxRetries:          3,
                RetryDelay:          time.Second,
                HealthCheckInterval: 30 * time.Second,
            },
            {
                Name:                "documents",
                Type:                database.TypeMongoDB,
                DSN:                 "mongodb://localhost:27017/myapp",
                MaxOpenConns:        100,
                MaxIdleConns:        5,
                ConnMaxIdleTime:     30 * time.Minute,
                HealthCheckInterval: 30 * time.Second,
            },
        },
    }

    dbExt := database.NewExtension(config)
    app.RegisterExtension(dbExt)
    
    app.Run()
}

Usage Examples

Dependency Injection Access

The extension registers several services in the DI container:

// Get the database manager
dbManager := forge.Must[*database.DatabaseManager](app.Container(), "databaseManager")

// Get the default database
db := forge.Must[database.Database](app.Container(), "database")

// For SQL databases, get Bun ORM instance
bunDB := forge.Must[*bun.DB](app.Container(), "db")

// For MongoDB, get client
mongoClient := forge.Must[*mongo.Client](app.Container(), "mongo")

SQL Database Operations

type User struct {
    ID        int64     `bun:"id,pk,autoincrement"`
    Name      string    `bun:"name,notnull"`
    Email     string    `bun:"email,unique,notnull"`
    CreatedAt time.Time `bun:"created_at,nullzero,notnull,default:current_timestamp"`
    UpdatedAt time.Time `bun:"updated_at,nullzero,notnull,default:current_timestamp"`
}

func createUserHandler(c forge.Context) error {
    // Get Bun DB instance from DI
    db := forge.Must[*bun.DB](c.Container(), "db")
    
    var user User
    if err := c.Bind(&user); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    // Insert user using Bun ORM
    _, err := db.NewInsert().Model(&user).Exec(c.Context())
    if err != nil {
        return c.JSON(500, map[string]string{"error": "Failed to create user"})
    }
    
    return c.JSON(201, user)
}

func getUserHandler(c forge.Context) error {
    db := forge.Must[*bun.DB](c.Container(), "db")
    userID := c.Param("user_id")
    
    var user User
    err := db.NewSelect().Model(&user).Where("id = ?", userID).Scan(c.Context())
    if err != nil {
        if errors.Is(err, sql.ErrNoRows) {
            return c.JSON(404, map[string]string{"error": "User not found"})
        }
        return c.JSON(500, map[string]string{"error": "Database error"})
    }
    
    return c.JSON(200, user)
}
func advancedQueries(c forge.Context) error {
    db := forge.Must[*bun.DB](c.Container(), "db")
    
    // Complex query with joins
    var users []User
    err := db.NewSelect().
        Model(&users).
        Where("created_at > ?", time.Now().AddDate(0, -1, 0)).
        Order("created_at DESC").
        Limit(10).
        Scan(c.Context())
    if err != nil {
        return err
    }
    
    // Bulk insert
    users = []User{
        {Name: "John", Email: "john@example.com"},
        {Name: "Jane", Email: "jane@example.com"},
    }
    _, err = db.NewInsert().Model(&users).Exec(c.Context())
    if err != nil {
        return err
    }
    
    // Update with conditions
    _, err = db.NewUpdate().
        Model((*User)(nil)).
        Set("updated_at = ?", time.Now()).
        Where("created_at < ?", time.Now().AddDate(0, 0, -30)).
        Exec(c.Context())
    
    return err
}
func transferFunds(c forge.Context) error {
    db := forge.Must[*bun.DB](c.Container(), "db")
    
    // Start transaction
    tx, err := db.BeginTx(c.Context(), nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    // Debit from account A
    _, err = tx.NewUpdate().
        Model((*Account)(nil)).
        Set("balance = balance - ?", 100).
        Where("id = ?", accountA).
        Exec(c.Context())
    if err != nil {
        return err
    }
    
    // Credit to account B
    _, err = tx.NewUpdate().
        Model((*Account)(nil)).
        Set("balance = balance + ?", 100).
        Where("id = ?", accountB).
        Exec(c.Context())
    if err != nil {
        return err
    }
    
    // Commit transaction
    return tx.Commit()
}
func rawSQLQueries(c forge.Context) error {
    db := forge.Must[*bun.DB](c.Container(), "db")
    
    // Raw query
    var count int
    err := db.NewRaw("SELECT COUNT(*) FROM users WHERE created_at > ?", 
        time.Now().AddDate(0, -1, 0)).Scan(c.Context(), &count)
    if err != nil {
        return err
    }
    
    // Raw exec
    _, err = db.NewRaw("UPDATE users SET last_login = NOW() WHERE id = ?", 
        userID).Exec(c.Context())
    
    return err
}

MongoDB Operations

type Document struct {
    ID        primitive.ObjectID `bson:"_id,omitempty" json:"id"`
    Title     string             `bson:"title" json:"title"`
    Content   string             `bson:"content" json:"content"`
    CreatedAt time.Time          `bson:"created_at" json:"created_at"`
    UpdatedAt time.Time          `bson:"updated_at" json:"updated_at"`
}

func createDocumentHandler(c forge.Context) error {
    // Get MongoDB client from DI
    client := forge.Must[*mongo.Client](c.Container(), "mongo")
    collection := client.Database("myapp").Collection("documents")
    
    var doc Document
    if err := c.Bind(&doc); err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    doc.ID = primitive.NewObjectID()
    doc.CreatedAt = time.Now()
    doc.UpdatedAt = time.Now()
    
    _, err := collection.InsertOne(c.Context(), doc)
    if err != nil {
        return c.JSON(500, map[string]string{"error": "Failed to create document"})
    }
    
    return c.JSON(201, doc)
}

func getDocumentHandler(c forge.Context) error {
    client := forge.Must[*mongo.Client](c.Container(), "mongo")
    collection := client.Database("myapp").Collection("documents")
    
    docID := c.Param("doc_id")
    objectID, err := primitive.ObjectIDFromHex(docID)
    if err != nil {
        return c.JSON(400, map[string]string{"error": "Invalid document ID"})
    }
    
    var doc Document
    err = collection.FindOne(c.Context(), bson.M{"_id": objectID}).Decode(&doc)
    if err != nil {
        if errors.Is(err, mongo.ErrNoDocuments) {
            return c.JSON(404, map[string]string{"error": "Document not found"})
        }
        return c.JSON(500, map[string]string{"error": "Database error"})
    }
    
    return c.JSON(200, doc)
}
func advancedMongoQueries(c forge.Context) error {
    client := forge.Must[*mongo.Client](c.Container(), "mongo")
    collection := client.Database("myapp").Collection("documents")
    
    // Find with filters
    filter := bson.M{
        "created_at": bson.M{"$gte": time.Now().AddDate(0, -1, 0)},
        "title":      bson.M{"$regex": "important", "$options": "i"},
    }
    
    cursor, err := collection.Find(c.Context(), filter)
    if err != nil {
        return err
    }
    defer cursor.Close(c.Context())
    
    var documents []Document
    if err = cursor.All(c.Context(), &documents); err != nil {
        return err
    }
    
    // Bulk operations
    var operations []mongo.WriteModel
    for _, doc := range documents {
        operation := mongo.NewUpdateOneModel().
            SetFilter(bson.M{"_id": doc.ID}).
            SetUpdate(bson.M{"$set": bson.M{"updated_at": time.Now()}})
        operations = append(operations, operation)
    }
    
    _, err = collection.BulkWrite(c.Context(), operations)
    return err
}
func aggregationExample(c forge.Context) error {
    client := forge.Must[*mongo.Client](c.Container(), "mongo")
    collection := client.Database("myapp").Collection("documents")
    
    // Aggregation pipeline
    pipeline := []bson.M{
        {"$match": bson.M{"created_at": bson.M{"$gte": time.Now().AddDate(0, -1, 0)}}},
        {"$group": bson.M{
            "_id":   "$category",
            "count": bson.M{"$sum": 1},
            "latest": bson.M{"$max": "$created_at"},
        }},
        {"$sort": bson.M{"count": -1}},
    }
    
    cursor, err := collection.Aggregate(c.Context(), pipeline)
    if err != nil {
        return err
    }
    defer cursor.Close(c.Context())
    
    var results []bson.M
    if err = cursor.All(c.Context(), &results); err != nil {
        return err
    }
    
    return c.JSON(200, results)
}
func mongoTransaction(c forge.Context) error {
    client := forge.Must[*mongo.Client](c.Container(), "mongo")
    
    // Start session
    session, err := client.StartSession()
    if err != nil {
        return err
    }
    defer session.EndSession(c.Context())
    
    // Execute transaction
    _, err = session.WithTransaction(c.Context(), func(sessCtx mongo.SessionContext) (interface{}, error) {
        collection := client.Database("myapp").Collection("documents")
        
        // Insert document
        doc := Document{
            ID:        primitive.NewObjectID(),
            Title:     "Transaction Test",
            Content:   "This is a test document",
            CreatedAt: time.Now(),
            UpdatedAt: time.Now(),
        }
        
        _, err := collection.InsertOne(sessCtx, doc)
        if err != nil {
            return nil, err
        }
        
        // Update counter
        counterCollection := client.Database("myapp").Collection("counters")
        _, err = counterCollection.UpdateOne(
            sessCtx,
            bson.M{"_id": "documents"},
            bson.M{"$inc": bson.M{"count": 1}},
        )
        
        return nil, err
    })
    
    return err
}

Database Manager Operations

func databaseManagerExample(c forge.Context) error {
    // Get database manager
    dbManager := forge.Must[*database.DatabaseManager](c.Container(), "databaseManager")
    
    // Get specific database by name
    mainDB, err := dbManager.Get("main")
    if err != nil {
        return err
    }
    
    // Get SQL database with Bun ORM
    bunDB, err := dbManager.SQL("main")
    if err != nil {
        return err
    }
    
    // Get MongoDB client
    mongoClient, err := dbManager.Mongo("documents")
    if err != nil {
        return err
    }
    
    // Health check all databases
    statuses := dbManager.HealthCheckAll(c.Context())
    for name, status := range statuses {
        if !status.Healthy {
            log.Printf("Database %s is unhealthy: %s", name, status.Message)
        }
    }
    
    // List all registered databases
    databases := dbManager.List()
    
    return c.JSON(200, map[string]interface{}{
        "databases": databases,
        "health":    statuses,
    })
}

Health Monitoring

The extension provides comprehensive health monitoring:

func healthCheckExample(c forge.Context) error {
    dbManager := forge.Must[*database.DatabaseManager](c.Container(), "databaseManager")
    
    // Check all databases
    statuses := dbManager.HealthCheckAll(c.Context())
    
    overall := "healthy"
    for _, status := range statuses {
        if !status.Healthy {
            overall = "unhealthy"
            break
        }
    }
    
    return c.JSON(200, map[string]interface{}{
        "status":    overall,
        "databases": statuses,
        "timestamp": time.Now(),
    })
}

Health status includes:

  • Healthy: Boolean indicating database health
  • Message: Error message if unhealthy
  • Latency: Response time for health check
  • CheckedAt: Timestamp of last health check

Connection Pool Statistics

Monitor connection pool performance:

func poolStatsExample(c forge.Context) error {
    db := forge.Must[database.Database](c.Container(), "database")
    
    stats := db.Stats()
    
    return c.JSON(200, map[string]interface{}{
        "open_connections":    stats.OpenConnections,
        "in_use":             stats.InUse,
        "idle":               stats.Idle,
        "wait_count":         stats.WaitCount,
        "wait_duration":      stats.WaitDuration,
        "max_idle_closed":    stats.MaxIdleClosed,
        "max_lifetime_closed": stats.MaxLifetimeClosed,
    })
}

Configuration Reference

DatabaseConfig

FieldTypeDescriptionDefault
namestringDatabase connection nameRequired
typeDatabaseTypeDatabase type (postgres, mysql, sqlite, mongodb)Required
dsnstringData source name / connection stringRequired
max_open_connsintMaximum open connections25
max_idle_connsintMaximum idle connections5
conn_max_lifetimetime.DurationMaximum connection lifetime5m
conn_max_idle_timetime.DurationMaximum connection idle time5m
max_retriesintMaximum retry attempts3
retry_delaytime.DurationDelay between retries1s
health_check_intervaltime.DurationHealth check interval30s
configmap[string]interface{}Database-specific configuration

Database Types

const (
    TypePostgres DatabaseType = "postgres"
    TypeMySQL    DatabaseType = "mysql"
    TypeSQLite   DatabaseType = "sqlite"
    TypeMongoDB  DatabaseType = "mongodb"
)

Best Practices

Connection Pool Configuration

// For high-traffic applications
config := database.DatabaseConfig{
    MaxOpenConns:        50,  // Higher for more concurrent connections
    MaxIdleConns:        10,  // Keep more idle connections
    ConnMaxLifetime:     time.Hour,     // Longer lifetime
    ConnMaxIdleTime:     30 * time.Minute, // Reasonable idle time
}

// For low-traffic applications
config := database.DatabaseConfig{
    MaxOpenConns:        10,  // Lower for resource efficiency
    MaxIdleConns:        2,   // Fewer idle connections
    ConnMaxLifetime:     30 * time.Minute, // Shorter lifetime
    ConnMaxIdleTime:     5 * time.Minute,  // Shorter idle time
}

Error Handling

func handleDatabaseErrors(err error) error {
    switch {
    case errors.Is(err, sql.ErrNoRows):
        return errors.New("record not found")
    case errors.Is(err, mongo.ErrNoDocuments):
        return errors.New("document not found")
    case strings.Contains(err.Error(), "connection refused"):
        return errors.New("database unavailable")
    default:
        return errors.New("database error")
    }
}

Performance Optimization

  1. Use Connection Pooling: Configure appropriate pool sizes
  2. Monitor Health: Implement health checks and monitoring
  3. Handle Retries: Configure retry logic for transient failures
  4. Use Transactions: Group related operations in transactions
  5. Index Optimization: Create appropriate database indexes
  6. Query Optimization: Use efficient queries and avoid N+1 problems

Troubleshooting

Common Issues

Connection Pool Exhaustion

// Monitor pool statistics
stats := db.Stats()
if stats.WaitCount > 0 {
    // Consider increasing MaxOpenConns
}

Health Check Failures

// Check individual database health
status := db.Health(ctx)
if !status.Healthy {
    log.Printf("Database unhealthy: %s (latency: %v)", status.Message, status.Latency)
}

Transaction Deadlocks

// Implement retry logic for deadlocks
func retryTransaction(fn func() error) error {
    for i := 0; i < 3; i++ {
        err := fn()
        if err == nil {
            return nil
        }
        if strings.Contains(err.Error(), "deadlock") {
            time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
            continue
        }
        return err
    }
    return errors.New("transaction failed after retries")
}

For more advanced database operations and patterns, see the Bun ORM documentation for SQL databases and MongoDB Go Driver documentation for MongoDB operations.

How is this guide?

Last updated on