Events Extension

Event-driven architecture with event sourcing and message brokers

Events Extension

The Events extension provides a comprehensive event-driven architecture with event sourcing capabilities, supporting multiple message brokers and event stores for building scalable, decoupled applications.

Features

Event-Driven Architecture

  • Event Bus: Central hub for publishing and subscribing to events
  • Event Handlers: Type-safe event handling with automatic registration
  • Message Brokers: Support for multiple broker backends (Memory, NATS, Redis)
  • Event Routing: Intelligent routing based on event types and patterns

Event Sourcing

  • Event Store: Persistent storage for event streams
  • Aggregate Reconstruction: Rebuild application state from events
  • Event Versioning: Schema evolution support for events
  • Snapshots: Performance optimization for large event streams

Message Brokers

  • Memory Broker: In-memory messaging for development and testing
  • NATS Broker: High-performance, cloud-native messaging
  • Redis Broker: Redis-based pub/sub messaging
  • Multi-Broker: Support for multiple brokers with priority routing

Event Store Backends

  • Memory Store: In-memory storage for development
  • PostgreSQL Store: Production-ready relational storage
  • MongoDB Store: Document-based event storage

Advanced Features

  • Correlation IDs: Request tracking across services
  • Event Metadata: Rich context and tracing information
  • Retry Logic: Configurable retry mechanisms for failed events
  • Circuit Breaker: Fault tolerance for event processing
  • Metrics & Monitoring: Comprehensive observability

Installation

Go Module

go get github.com/xraph/forge/extensions/events

Docker

FROM xraph/forge:latest
# Events extension is included

Package Manager

# Using Forge CLI
forge extension add events

# Using package manager
npm install @xraph/forge-events

Configuration

YAML Configuration

extensions:
  events:
    bus:
      default_broker: "nats"
      max_retries: 3
      retry_delay: "5s"
      enable_metrics: true
      enable_tracing: true
      buffer_size: 1000
      worker_count: 10
      processing_timeout: "30s"
    
    store:
      type: "postgres"
      database: "main"
      table: "events"
    
    brokers:
      - name: "memory"
        type: "memory"
        enabled: true
        priority: 1
        config: {}
      
      - name: "nats"
        type: "nats"
        enabled: true
        priority: 2
        config:
          url: "nats://localhost:4222"
          cluster_id: "forge-cluster"
          client_id: "forge-events"
      
      - name: "redis"
        type: "redis"
        enabled: false
        priority: 3
        config:
          addr: "localhost:6379"
          password: ""
          db: 0
    
    metrics:
      enabled: true
      publish_interval: "30s"
      enable_per_type: true
      enable_per_handler: true

Environment Variables

# Event Bus Configuration
FORGE_EVENTS_BUS_DEFAULT_BROKER=nats
FORGE_EVENTS_BUS_MAX_RETRIES=3
FORGE_EVENTS_BUS_RETRY_DELAY=5s
FORGE_EVENTS_BUS_ENABLE_METRICS=true
FORGE_EVENTS_BUS_BUFFER_SIZE=1000
FORGE_EVENTS_BUS_WORKER_COUNT=10

# Event Store Configuration
FORGE_EVENTS_STORE_TYPE=postgres
FORGE_EVENTS_STORE_DATABASE=main
FORGE_EVENTS_STORE_TABLE=events

# NATS Broker Configuration
FORGE_EVENTS_NATS_URL=nats://localhost:4222
FORGE_EVENTS_NATS_CLUSTER_ID=forge-cluster
FORGE_EVENTS_NATS_CLIENT_ID=forge-events

# Redis Broker Configuration
FORGE_EVENTS_REDIS_ADDR=localhost:6379
FORGE_EVENTS_REDIS_PASSWORD=""
FORGE_EVENTS_REDIS_DB=0

# Metrics Configuration
FORGE_EVENTS_METRICS_ENABLED=true
FORGE_EVENTS_METRICS_PUBLISH_INTERVAL=30s

Programmatic Configuration

package main

import (
    "time"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/events"
)

func main() {
    app := forge.NewApp(forge.AppConfig{
        Name:    "my-app",
        Version: "1.0.0",
    })

    // Configure events extension
    config := events.Config{
        Bus: events.BusConfig{
            DefaultBroker:     "nats",
            MaxRetries:        3,
            RetryDelay:        time.Second * 5,
            EnableMetrics:     true,
            EnableTracing:     true,
            BufferSize:        1000,
            WorkerCount:       10,
            ProcessingTimeout: time.Second * 30,
        },
        Store: events.StoreConfig{
            Type:     "postgres",
            Database: "main",
            Table:    "events",
        },
        Brokers: []events.BrokerConfig{
            {
                Name:     "nats",
                Type:     "nats",
                Enabled:  true,
                Priority: 1,
                Config: map[string]interface{}{
                    "url":        "nats://localhost:4222",
                    "cluster_id": "forge-cluster",
                    "client_id":  "forge-events",
                },
            },
        },
        Metrics: events.MetricsConfig{
            Enabled:          true,
            PublishInterval:  time.Second * 30,
            EnablePerType:    true,
            EnablePerHandler: true,
        },
    }

    // Register extension
    ext := events.NewExtensionWithConfig(config)
    app.RegisterExtension(ext)

    app.Start()
}

Usage Examples

Basic Event Publishing

package main

import (
    "context"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/events/core"
)

func publishUserEvent(app forge.App) error {
    // Get event bus from DI container
    eventBus := forge.Must[core.EventBus](app.Container(), "eventBus")
    
    // Create event
    event := core.NewEvent("user.created", "user-123", map[string]interface{}{
        "name":  "John Doe",
        "email": "john@example.com",
        "role":  "user",
    }).WithCorrelationID("req-456")
    
    // Publish event
    ctx := context.Background()
    return eventBus.Publish(ctx, event)
}

Event Subscription & Handling

package main

import (
    "context"
    "log"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/events/core"
)

func setupEventHandlers(app forge.App) error {
    eventBus := forge.Must[core.EventBus](app.Container(), "eventBus")
    
    // Simple handler function
    userHandler := core.EventHandlerFunc(func(ctx context.Context, event *core.Event) error {
        log.Printf("User event received: %s for %s", event.Type, event.AggregateID)
        
        // Process the event
        switch event.Type {
        case "user.created":
            return handleUserCreated(ctx, event)
        case "user.updated":
            return handleUserUpdated(ctx, event)
        case "user.deleted":
            return handleUserDeleted(ctx, event)
        }
        
        return nil
    })
    
    // Subscribe to user events
    if err := eventBus.Subscribe("user.created", userHandler); err != nil {
        return err
    }
    if err := eventBus.Subscribe("user.updated", userHandler); err != nil {
        return err
    }
    if err := eventBus.Subscribe("user.deleted", userHandler); err != nil {
        return err
    }
    
    return nil
}

func handleUserCreated(ctx context.Context, event *core.Event) error {
    // Send welcome email
    // Update analytics
    // Create user profile
    return nil
}

func handleUserUpdated(ctx context.Context, event *core.Event) error {
    // Update search index
    // Invalidate cache
    return nil
}

func handleUserDeleted(ctx context.Context, event *core.Event) error {
    // Clean up user data
    // Send goodbye email
    return nil
}

Typed Event Handlers

package main

import (
    "context"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/events/core"
)

func setupTypedHandlers(app forge.App) error {
    eventBus := forge.Must[core.EventBus](app.Container(), "eventBus")
    
    // Create typed handler for order events
    orderHandler := core.NewTypedEventHandler(
        "order-processor",
        []string{"order.created", "order.paid", "order.shipped", "order.delivered"},
        func(ctx context.Context, event *core.Event) error {
            switch event.Type {
            case "order.created":
                return processNewOrder(ctx, event)
            case "order.paid":
                return processPayment(ctx, event)
            case "order.shipped":
                return updateShipping(ctx, event)
            case "order.delivered":
                return completeOrder(ctx, event)
            }
            return nil
        },
    )
    
    // Subscribe to all order events
    for _, eventType := range []string{"order.created", "order.paid", "order.shipped", "order.delivered"} {
        if err := eventBus.Subscribe(eventType, orderHandler); err != nil {
            return err
        }
    }
    
    return nil
}

Event Sourcing

package main

import (
    "context"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/events/core"
)

// Order aggregate
type Order struct {
    ID       string
    Status   string
    Total    float64
    Items    []OrderItem
    Version  int
}

type OrderItem struct {
    ProductID string
    Quantity  int
    Price     float64
}

func createOrderWithEvents(app forge.App, orderID string, items []OrderItem) error {
    eventStore := forge.Must[core.EventStore](app.Container(), "eventStore")
    eventBus := forge.Must[core.EventBus](app.Container(), "eventBus")
    
    ctx := context.Background()
    
    // Calculate total
    var total float64
    for _, item := range items {
        total += item.Price * float64(item.Quantity)
    }
    
    // Create events sequence
    events := []*core.Event{
        core.NewEvent("order.created", orderID, map[string]interface{}{
            "status": "pending",
            "total":  total,
            "items":  items,
        }).WithVersion(1),
        
        core.NewEvent("order.validated", orderID, map[string]interface{}{
            "validation_result": "passed",
        }).WithVersion(2),
    }
    
    // Save events to store
    for _, event := range events {
        if err := eventStore.SaveEvent(ctx, event); err != nil {
            return err
        }
        
        // Publish event to bus
        if err := eventBus.Publish(ctx, event); err != nil {
            return err
        }
    }
    
    return nil
}

func reconstructOrderFromEvents(app forge.App, orderID string) (*Order, error) {
    eventStore := forge.Must[core.EventStore](app.Container(), "eventStore")
    
    ctx := context.Background()
    
    // Get all events for the order
    events, err := eventStore.GetEventsByAggregate(ctx, orderID, 0)
    if err != nil {
        return nil, err
    }
    
    // Reconstruct order state
    order := &Order{ID: orderID}
    
    for _, event := range events {
        switch event.Type {
        case "order.created":
            data := event.Data.(map[string]interface{})
            order.Status = data["status"].(string)
            order.Total = data["total"].(float64)
            // Reconstruct items...
            
        case "order.paid":
            order.Status = "paid"
            
        case "order.shipped":
            order.Status = "shipped"
            
        case "order.delivered":
            order.Status = "delivered"
        }
        
        order.Version = event.Version
    }
    
    return order, nil
}

Multi-Broker Configuration

package main

import (
    "context"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/events/core"
)

func publishToSpecificBroker(app forge.App) error {
    eventBus := forge.Must[core.EventBus](app.Container(), "eventBus")
    
    ctx := context.Background()
    
    // Create high-priority event
    criticalEvent := core.NewEvent("system.alert", "sys-001", map[string]interface{}{
        "level":   "critical",
        "message": "Database connection lost",
    })
    
    // Publish to specific broker (e.g., NATS for reliability)
    if err := eventBus.PublishTo(ctx, "nats", criticalEvent); err != nil {
        return err
    }
    
    // Create regular event
    regularEvent := core.NewEvent("user.login", "user-123", map[string]interface{}{
        "timestamp": "2024-01-15T10:30:00Z",
        "ip":        "192.168.1.100",
    })
    
    // Publish to default broker
    return eventBus.Publish(ctx, regularEvent)
}

Advanced Features

Event Correlation & Tracing

package main

import (
    "context"
    "github.com/xraph/forge/extensions/events/core"
)

func publishWithCorrelation(eventBus core.EventBus, correlationID string) error {
    ctx := context.Background()
    
    // Create event with correlation ID for request tracing
    event := core.NewEvent("payment.processed", "payment-123", map[string]interface{}{
        "amount":   100.00,
        "currency": "USD",
        "method":   "credit_card",
    }).WithCorrelationID(correlationID).WithSource("payment-service")
    
    return eventBus.Publish(ctx, event)
}

Event Filtering & Routing

package main

import (
    "context"
    "strings"
    "github.com/xraph/forge/extensions/events/core"
)

func setupConditionalHandlers(eventBus core.EventBus) error {
    // Handler that only processes high-value orders
    highValueHandler := core.EventHandlerFunc(func(ctx context.Context, event *core.Event) error {
        if event.Type == "order.created" {
            data := event.Data.(map[string]interface{})
            if total, ok := data["total"].(float64); ok && total > 1000.00 {
                // Process high-value order
                return processHighValueOrder(ctx, event)
            }
        }
        return nil
    })
    
    // Handler for specific regions
    regionHandler := core.EventHandlerFunc(func(ctx context.Context, event *core.Event) error {
        if strings.HasPrefix(event.Type, "user.") {
            data := event.Data.(map[string]interface{})
            if region, ok := data["region"].(string); ok && region == "EU" {
                // Process EU-specific logic
                return processEUUser(ctx, event)
            }
        }
        return nil
    })
    
    // Subscribe handlers
    eventBus.Subscribe("order.created", highValueHandler)
    eventBus.Subscribe("user.created", regionHandler)
    eventBus.Subscribe("user.updated", regionHandler)
    
    return nil
}

Event Store Queries

package main

import (
    "context"
    "time"
    "github.com/xraph/forge/extensions/events/core"
)

func queryEventStore(eventStore core.EventStore) error {
    ctx := context.Background()
    
    // Get events by type
    userEvents, err := eventStore.GetEventsByType(ctx, "user.created", 100, 0)
    if err != nil {
        return err
    }
    
    // Get events since timestamp
    since := time.Now().Add(-24 * time.Hour)
    recentEvents, err := eventStore.GetEventsSince(ctx, since, 50, 0)
    if err != nil {
        return err
    }
    
    // Get events in time range
    start := time.Now().Add(-7 * 24 * time.Hour)
    end := time.Now()
    weekEvents, err := eventStore.GetEventsInRange(ctx, start, end, 200, 0)
    if err != nil {
        return err
    }
    
    // Process events...
    processEvents(userEvents, recentEvents, weekEvents)
    
    return nil
}

Metrics & Monitoring

package main

import (
    "context"
    "log"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/events/core"
)

func monitorEventBus(app forge.App) {
    eventBus := forge.Must[core.EventBus](app.Container(), "eventBus")
    
    // Get event bus statistics
    stats := eventBus.GetStats()
    
    log.Printf("Event Bus Stats:")
    log.Printf("  Name: %v", stats["name"])
    log.Printf("  Started: %v", stats["started"])
    log.Printf("  Brokers: %v", stats["brokers_count"])
    log.Printf("  Published: %v", stats["events_published"])
    log.Printf("  Processed: %v", stats["events_processed"])
    log.Printf("  Failed: %v", stats["events_failed"])
    
    // Get broker-specific stats
    brokers := eventBus.GetBrokers()
    for name, broker := range brokers {
        brokerStats := broker.GetStats()
        log.Printf("Broker %s Stats: %+v", name, brokerStats)
    }
}

Best Practices

Event Design

  • Use descriptive event names: user.created, order.shipped, payment.failed
  • Include aggregate ID: Always specify which entity the event relates to
  • Add correlation IDs: Enable request tracing across services
  • Version your events: Support schema evolution with version numbers
  • Keep events immutable: Never modify events after creation

Handler Implementation

  • Idempotent handlers: Ensure handlers can be safely retried
  • Error handling: Implement proper error handling and logging
  • Timeout management: Set appropriate timeouts for long-running operations
  • Resource cleanup: Always clean up resources in handlers

Performance Optimization

  • Batch processing: Process multiple events together when possible
  • Async processing: Use async handlers for non-critical operations
  • Connection pooling: Configure appropriate connection pools
  • Buffer sizing: Tune buffer sizes based on event volume

Security Considerations

  • Event validation: Validate event data before processing
  • Access control: Implement proper authorization for event publishing
  • Sensitive data: Avoid including sensitive data in events
  • Audit logging: Log all event operations for security auditing

Troubleshooting

Common Issues

Events Not Being Delivered

# Check event bus status
curl http://localhost:8080/health/events

# Verify broker connections
curl http://localhost:8080/metrics/events/brokers

# Check event store connectivity
curl http://localhost:8080/health/events/store

High Event Processing Latency

# Increase worker count
extensions:
  events:
    bus:
      worker_count: 20
      buffer_size: 2000
      processing_timeout: "60s"

Event Store Performance Issues

# Optimize event store configuration
extensions:
  events:
    store:
      type: "postgres"
      database: "events_db"
      # Use dedicated database for events

Memory Usage Issues

# Tune memory settings
extensions:
  events:
    bus:
      buffer_size: 500  # Reduce buffer size
      worker_count: 5   # Reduce workers
    brokers:
      - name: "memory"
        config:
          max_events: 10000  # Limit memory broker

Debugging

Enable Debug Logging

logging:
  level: debug
  loggers:
    events: debug
    events.bus: debug
    events.store: debug

Monitor Event Flow

// Add event middleware for debugging
handler := core.EventHandlerFunc(func(ctx context.Context, event *core.Event) error {
    log.Printf("Processing event: %s [%s] at %v", 
        event.Type, event.ID, event.Timestamp)
    
    start := time.Now()
    err := originalHandler(ctx, event)
    duration := time.Since(start)
    
    if err != nil {
        log.Printf("Event processing failed: %v (took %v)", err, duration)
    } else {
        log.Printf("Event processed successfully (took %v)", duration)
    }
    
    return err
})

Next Steps

How is this guide?

Last updated on