Database Extension
Multi-database support with SQL (Postgres, MySQL, SQLite) and NoSQL (MongoDB)
Database Extension
The Database extension provides a unified interface for working with multiple database backends, including SQL databases (PostgreSQL, MySQL, SQLite) and NoSQL databases (MongoDB, Redis). It offers connection pooling, health monitoring, migration support, and production-ready features.
The Database extension abstracts database operations while preserving access to native drivers (Bun ORM for SQL, MongoDB driver for NoSQL) for advanced use cases.
Features
Supported Database Types
- PostgreSQL: Advanced SQL database with JSON support
- MySQL: Popular relational database
- SQLite: Embedded SQL database
- MongoDB: Document-oriented NoSQL database
Core Capabilities
- Unified Interface: Consistent API across all database types
- Connection Pooling: Efficient connection management with configurable pool settings
- Health Monitoring: Real-time database health checks with latency tracking
- Native Driver Access: Direct access to Bun ORM (SQL) and MongoDB client
- Transaction Management: ACID compliance for SQL databases, sessions for MongoDB
- Observability: Query metrics, performance tracking, and structured logging
Production Features
- Connection Retry: Automatic reconnection with configurable retry logic
- Pool Statistics: Detailed connection pool metrics
- Health Checks: Continuous health monitoring with status reporting
- Multiple Databases: Support for multiple database connections in a single application
- DI Integration: Seamless integration with Forge's dependency injection system
Installation
go get github.com/xraph/forge/extensions/databaseConfiguration
extensions:
database:
# Default database name (optional, uses first database if not specified)
default: "main"
# Database configurations
databases:
- name: "main"
type: "postgres"
dsn: "postgres://user:password@localhost:5432/mydb?sslmode=disable"
max_open_conns: 25
max_idle_conns: 5
conn_max_lifetime: "5m"
conn_max_idle_time: "5m"
max_retries: 3
retry_delay: "1s"
health_check_interval: "30s"
- name: "cache"
type: "sqlite"
dsn: "file::memory:?cache=shared"
max_open_conns: 1
max_idle_conns: 1
- name: "documents"
type: "mongodb"
dsn: "mongodb://localhost:27017/myapp"
max_open_conns: 100
max_idle_conns: 5
conn_max_idle_time: "30m"# Default database
export DB_DEFAULT="main"
# PostgreSQL Configuration
export DB_POSTGRES_DSN="postgres://user:password@localhost:5432/mydb"
export DB_POSTGRES_MAX_OPEN_CONNS="25"
export DB_POSTGRES_MAX_IDLE_CONNS="5"
# MySQL Configuration
export DB_MYSQL_DSN="user:password@tcp(localhost:3306)/mydb?parseTime=true"
export DB_MYSQL_MAX_OPEN_CONNS="20"
# SQLite Configuration
export DB_SQLITE_DSN="./data/app.db"
# MongoDB Configuration
export DB_MONGODB_DSN="mongodb://localhost:27017/myapp"
export DB_MONGODB_MAX_OPEN_CONNS="100"package main
import (
"time"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/database"
)
func main() {
app := forge.New()
// Configure Database extension
config := database.Config{
Default: "main",
Databases: []database.DatabaseConfig{
{
Name: "main",
Type: database.TypePostgres,
DSN: "postgres://user:password@localhost:5432/mydb",
MaxOpenConns: 25,
MaxIdleConns: 5,
ConnMaxLifetime: 5 * time.Minute,
ConnMaxIdleTime: 5 * time.Minute,
MaxRetries: 3,
RetryDelay: time.Second,
HealthCheckInterval: 30 * time.Second,
},
{
Name: "documents",
Type: database.TypeMongoDB,
DSN: "mongodb://localhost:27017/myapp",
MaxOpenConns: 100,
MaxIdleConns: 5,
ConnMaxIdleTime: 30 * time.Minute,
HealthCheckInterval: 30 * time.Second,
},
},
}
dbExt := database.NewExtension(config)
app.RegisterExtension(dbExt)
app.Run()
}Usage Examples
Dependency Injection Access
The extension registers several services in the DI container:
// Get the database manager
dbManager := forge.Must[*database.DatabaseManager](app.Container(), "databaseManager")
// Get the default database
db := forge.Must[database.Database](app.Container(), "database")
// For SQL databases, get Bun ORM instance
bunDB := forge.Must[*bun.DB](app.Container(), "db")
// For MongoDB, get client
mongoClient := forge.Must[*mongo.Client](app.Container(), "mongo")SQL Database Operations
type User struct {
ID int64 `bun:"id,pk,autoincrement"`
Name string `bun:"name,notnull"`
Email string `bun:"email,unique,notnull"`
CreatedAt time.Time `bun:"created_at,nullzero,notnull,default:current_timestamp"`
UpdatedAt time.Time `bun:"updated_at,nullzero,notnull,default:current_timestamp"`
}
func createUserHandler(c forge.Context) error {
// Get Bun DB instance from DI
db := forge.Must[*bun.DB](c.Container(), "db")
var user User
if err := c.Bind(&user); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
// Insert user using Bun ORM
_, err := db.NewInsert().Model(&user).Exec(c.Context())
if err != nil {
return c.JSON(500, map[string]string{"error": "Failed to create user"})
}
return c.JSON(201, user)
}
func getUserHandler(c forge.Context) error {
db := forge.Must[*bun.DB](c.Container(), "db")
userID := c.Param("user_id")
var user User
err := db.NewSelect().Model(&user).Where("id = ?", userID).Scan(c.Context())
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return c.JSON(404, map[string]string{"error": "User not found"})
}
return c.JSON(500, map[string]string{"error": "Database error"})
}
return c.JSON(200, user)
}func advancedQueries(c forge.Context) error {
db := forge.Must[*bun.DB](c.Container(), "db")
// Complex query with joins
var users []User
err := db.NewSelect().
Model(&users).
Where("created_at > ?", time.Now().AddDate(0, -1, 0)).
Order("created_at DESC").
Limit(10).
Scan(c.Context())
if err != nil {
return err
}
// Bulk insert
users = []User{
{Name: "John", Email: "john@example.com"},
{Name: "Jane", Email: "jane@example.com"},
}
_, err = db.NewInsert().Model(&users).Exec(c.Context())
if err != nil {
return err
}
// Update with conditions
_, err = db.NewUpdate().
Model((*User)(nil)).
Set("updated_at = ?", time.Now()).
Where("created_at < ?", time.Now().AddDate(0, 0, -30)).
Exec(c.Context())
return err
}func transferFunds(c forge.Context) error {
db := forge.Must[*bun.DB](c.Container(), "db")
// Start transaction
tx, err := db.BeginTx(c.Context(), nil)
if err != nil {
return err
}
defer tx.Rollback()
// Debit from account A
_, err = tx.NewUpdate().
Model((*Account)(nil)).
Set("balance = balance - ?", 100).
Where("id = ?", accountA).
Exec(c.Context())
if err != nil {
return err
}
// Credit to account B
_, err = tx.NewUpdate().
Model((*Account)(nil)).
Set("balance = balance + ?", 100).
Where("id = ?", accountB).
Exec(c.Context())
if err != nil {
return err
}
// Commit transaction
return tx.Commit()
}func rawSQLQueries(c forge.Context) error {
db := forge.Must[*bun.DB](c.Container(), "db")
// Raw query
var count int
err := db.NewRaw("SELECT COUNT(*) FROM users WHERE created_at > ?",
time.Now().AddDate(0, -1, 0)).Scan(c.Context(), &count)
if err != nil {
return err
}
// Raw exec
_, err = db.NewRaw("UPDATE users SET last_login = NOW() WHERE id = ?",
userID).Exec(c.Context())
return err
}MongoDB Operations
type Document struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`
Title string `bson:"title" json:"title"`
Content string `bson:"content" json:"content"`
CreatedAt time.Time `bson:"created_at" json:"created_at"`
UpdatedAt time.Time `bson:"updated_at" json:"updated_at"`
}
func createDocumentHandler(c forge.Context) error {
// Get MongoDB client from DI
client := forge.Must[*mongo.Client](c.Container(), "mongo")
collection := client.Database("myapp").Collection("documents")
var doc Document
if err := c.Bind(&doc); err != nil {
return c.JSON(400, map[string]string{"error": "Invalid request"})
}
doc.ID = primitive.NewObjectID()
doc.CreatedAt = time.Now()
doc.UpdatedAt = time.Now()
_, err := collection.InsertOne(c.Context(), doc)
if err != nil {
return c.JSON(500, map[string]string{"error": "Failed to create document"})
}
return c.JSON(201, doc)
}
func getDocumentHandler(c forge.Context) error {
client := forge.Must[*mongo.Client](c.Container(), "mongo")
collection := client.Database("myapp").Collection("documents")
docID := c.Param("doc_id")
objectID, err := primitive.ObjectIDFromHex(docID)
if err != nil {
return c.JSON(400, map[string]string{"error": "Invalid document ID"})
}
var doc Document
err = collection.FindOne(c.Context(), bson.M{"_id": objectID}).Decode(&doc)
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return c.JSON(404, map[string]string{"error": "Document not found"})
}
return c.JSON(500, map[string]string{"error": "Database error"})
}
return c.JSON(200, doc)
}func advancedMongoQueries(c forge.Context) error {
client := forge.Must[*mongo.Client](c.Container(), "mongo")
collection := client.Database("myapp").Collection("documents")
// Find with filters
filter := bson.M{
"created_at": bson.M{"$gte": time.Now().AddDate(0, -1, 0)},
"title": bson.M{"$regex": "important", "$options": "i"},
}
cursor, err := collection.Find(c.Context(), filter)
if err != nil {
return err
}
defer cursor.Close(c.Context())
var documents []Document
if err = cursor.All(c.Context(), &documents); err != nil {
return err
}
// Bulk operations
var operations []mongo.WriteModel
for _, doc := range documents {
operation := mongo.NewUpdateOneModel().
SetFilter(bson.M{"_id": doc.ID}).
SetUpdate(bson.M{"$set": bson.M{"updated_at": time.Now()}})
operations = append(operations, operation)
}
_, err = collection.BulkWrite(c.Context(), operations)
return err
}func aggregationExample(c forge.Context) error {
client := forge.Must[*mongo.Client](c.Container(), "mongo")
collection := client.Database("myapp").Collection("documents")
// Aggregation pipeline
pipeline := []bson.M{
{"$match": bson.M{"created_at": bson.M{"$gte": time.Now().AddDate(0, -1, 0)}}},
{"$group": bson.M{
"_id": "$category",
"count": bson.M{"$sum": 1},
"latest": bson.M{"$max": "$created_at"},
}},
{"$sort": bson.M{"count": -1}},
}
cursor, err := collection.Aggregate(c.Context(), pipeline)
if err != nil {
return err
}
defer cursor.Close(c.Context())
var results []bson.M
if err = cursor.All(c.Context(), &results); err != nil {
return err
}
return c.JSON(200, results)
}func mongoTransaction(c forge.Context) error {
client := forge.Must[*mongo.Client](c.Container(), "mongo")
// Start session
session, err := client.StartSession()
if err != nil {
return err
}
defer session.EndSession(c.Context())
// Execute transaction
_, err = session.WithTransaction(c.Context(), func(sessCtx mongo.SessionContext) (interface{}, error) {
collection := client.Database("myapp").Collection("documents")
// Insert document
doc := Document{
ID: primitive.NewObjectID(),
Title: "Transaction Test",
Content: "This is a test document",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
_, err := collection.InsertOne(sessCtx, doc)
if err != nil {
return nil, err
}
// Update counter
counterCollection := client.Database("myapp").Collection("counters")
_, err = counterCollection.UpdateOne(
sessCtx,
bson.M{"_id": "documents"},
bson.M{"$inc": bson.M{"count": 1}},
)
return nil, err
})
return err
}Database Manager Operations
func databaseManagerExample(c forge.Context) error {
// Get database manager
dbManager := forge.Must[*database.DatabaseManager](c.Container(), "databaseManager")
// Get specific database by name
mainDB, err := dbManager.Get("main")
if err != nil {
return err
}
// Get SQL database with Bun ORM
bunDB, err := dbManager.SQL("main")
if err != nil {
return err
}
// Get MongoDB client
mongoClient, err := dbManager.Mongo("documents")
if err != nil {
return err
}
// Health check all databases
statuses := dbManager.HealthCheckAll(c.Context())
for name, status := range statuses {
if !status.Healthy {
log.Printf("Database %s is unhealthy: %s", name, status.Message)
}
}
// List all registered databases
databases := dbManager.List()
return c.JSON(200, map[string]interface{}{
"databases": databases,
"health": statuses,
})
}Health Monitoring
The extension provides comprehensive health monitoring:
func healthCheckExample(c forge.Context) error {
dbManager := forge.Must[*database.DatabaseManager](c.Container(), "databaseManager")
// Check all databases
statuses := dbManager.HealthCheckAll(c.Context())
overall := "healthy"
for _, status := range statuses {
if !status.Healthy {
overall = "unhealthy"
break
}
}
return c.JSON(200, map[string]interface{}{
"status": overall,
"databases": statuses,
"timestamp": time.Now(),
})
}Health status includes:
- Healthy: Boolean indicating database health
- Message: Error message if unhealthy
- Latency: Response time for health check
- CheckedAt: Timestamp of last health check
Connection Pool Statistics
Monitor connection pool performance:
func poolStatsExample(c forge.Context) error {
db := forge.Must[database.Database](c.Container(), "database")
stats := db.Stats()
return c.JSON(200, map[string]interface{}{
"open_connections": stats.OpenConnections,
"in_use": stats.InUse,
"idle": stats.Idle,
"wait_count": stats.WaitCount,
"wait_duration": stats.WaitDuration,
"max_idle_closed": stats.MaxIdleClosed,
"max_lifetime_closed": stats.MaxLifetimeClosed,
})
}Configuration Reference
DatabaseConfig
| Field | Type | Description | Default |
|---|---|---|---|
name | string | Database connection name | Required |
type | DatabaseType | Database type (postgres, mysql, sqlite, mongodb) | Required |
dsn | string | Data source name / connection string | Required |
max_open_conns | int | Maximum open connections | 25 |
max_idle_conns | int | Maximum idle connections | 5 |
conn_max_lifetime | time.Duration | Maximum connection lifetime | 5m |
conn_max_idle_time | time.Duration | Maximum connection idle time | 5m |
max_retries | int | Maximum retry attempts | 3 |
retry_delay | time.Duration | Delay between retries | 1s |
health_check_interval | time.Duration | Health check interval | 30s |
config | map[string]interface{} | Database-specific configuration |
Database Types
const (
TypePostgres DatabaseType = "postgres"
TypeMySQL DatabaseType = "mysql"
TypeSQLite DatabaseType = "sqlite"
TypeMongoDB DatabaseType = "mongodb"
)Best Practices
Connection Pool Configuration
// For high-traffic applications
config := database.DatabaseConfig{
MaxOpenConns: 50, // Higher for more concurrent connections
MaxIdleConns: 10, // Keep more idle connections
ConnMaxLifetime: time.Hour, // Longer lifetime
ConnMaxIdleTime: 30 * time.Minute, // Reasonable idle time
}
// For low-traffic applications
config := database.DatabaseConfig{
MaxOpenConns: 10, // Lower for resource efficiency
MaxIdleConns: 2, // Fewer idle connections
ConnMaxLifetime: 30 * time.Minute, // Shorter lifetime
ConnMaxIdleTime: 5 * time.Minute, // Shorter idle time
}Error Handling
func handleDatabaseErrors(err error) error {
switch {
case errors.Is(err, sql.ErrNoRows):
return errors.New("record not found")
case errors.Is(err, mongo.ErrNoDocuments):
return errors.New("document not found")
case strings.Contains(err.Error(), "connection refused"):
return errors.New("database unavailable")
default:
return errors.New("database error")
}
}Performance Optimization
- Use Connection Pooling: Configure appropriate pool sizes
- Monitor Health: Implement health checks and monitoring
- Handle Retries: Configure retry logic for transient failures
- Use Transactions: Group related operations in transactions
- Index Optimization: Create appropriate database indexes
- Query Optimization: Use efficient queries and avoid N+1 problems
Troubleshooting
Common Issues
Connection Pool Exhaustion
// Monitor pool statistics
stats := db.Stats()
if stats.WaitCount > 0 {
// Consider increasing MaxOpenConns
}Health Check Failures
// Check individual database health
status := db.Health(ctx)
if !status.Healthy {
log.Printf("Database unhealthy: %s (latency: %v)", status.Message, status.Latency)
}Transaction Deadlocks
// Implement retry logic for deadlocks
func retryTransaction(fn func() error) error {
for i := 0; i < 3; i++ {
err := fn()
if err == nil {
return nil
}
if strings.Contains(err.Error(), "deadlock") {
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
continue
}
return err
}
return errors.New("transaction failed after retries")
}For more advanced database operations and patterns, see the Bun ORM documentation for SQL databases and MongoDB Go Driver documentation for MongoDB operations.
Related Extensions
How is this guide?
Last updated on