Events Extension
Event-driven architecture with event sourcing and message brokers
Events Extension
The Events extension provides a comprehensive event-driven architecture with event sourcing capabilities, supporting multiple message brokers and event stores for building scalable, decoupled applications.
Features
Event-Driven Architecture
- Event Bus: Central hub for publishing and subscribing to events
- Event Handlers: Type-safe event handling with automatic registration
- Message Brokers: Support for multiple broker backends (Memory, NATS, Redis)
- Event Routing: Intelligent routing based on event types and patterns
Event Sourcing
- Event Store: Persistent storage for event streams
- Aggregate Reconstruction: Rebuild application state from events
- Event Versioning: Schema evolution support for events
- Snapshots: Performance optimization for large event streams
Message Brokers
- Memory Broker: In-memory messaging for development and testing
- NATS Broker: High-performance, cloud-native messaging
- Redis Broker: Redis-based pub/sub messaging
- Multi-Broker: Support for multiple brokers with priority routing
Event Store Backends
- Memory Store: In-memory storage for development
- PostgreSQL Store: Production-ready relational storage
- MongoDB Store: Document-based event storage
Advanced Features
- Correlation IDs: Request tracking across services
- Event Metadata: Rich context and tracing information
- Retry Logic: Configurable retry mechanisms for failed events
- Circuit Breaker: Fault tolerance for event processing
- Metrics & Monitoring: Comprehensive observability
Installation
Go Module
go get github.com/xraph/forge/extensions/eventsDocker
FROM xraph/forge:latest
# Events extension is includedPackage Manager
# Using Forge CLI
forge extension add events
# Using package manager
npm install @xraph/forge-eventsConfiguration
YAML Configuration
extensions:
events:
bus:
default_broker: "nats"
max_retries: 3
retry_delay: "5s"
enable_metrics: true
enable_tracing: true
buffer_size: 1000
worker_count: 10
processing_timeout: "30s"
store:
type: "postgres"
database: "main"
table: "events"
brokers:
- name: "memory"
type: "memory"
enabled: true
priority: 1
config: {}
- name: "nats"
type: "nats"
enabled: true
priority: 2
config:
url: "nats://localhost:4222"
cluster_id: "forge-cluster"
client_id: "forge-events"
- name: "redis"
type: "redis"
enabled: false
priority: 3
config:
addr: "localhost:6379"
password: ""
db: 0
metrics:
enabled: true
publish_interval: "30s"
enable_per_type: true
enable_per_handler: trueEnvironment Variables
# Event Bus Configuration
FORGE_EVENTS_BUS_DEFAULT_BROKER=nats
FORGE_EVENTS_BUS_MAX_RETRIES=3
FORGE_EVENTS_BUS_RETRY_DELAY=5s
FORGE_EVENTS_BUS_ENABLE_METRICS=true
FORGE_EVENTS_BUS_BUFFER_SIZE=1000
FORGE_EVENTS_BUS_WORKER_COUNT=10
# Event Store Configuration
FORGE_EVENTS_STORE_TYPE=postgres
FORGE_EVENTS_STORE_DATABASE=main
FORGE_EVENTS_STORE_TABLE=events
# NATS Broker Configuration
FORGE_EVENTS_NATS_URL=nats://localhost:4222
FORGE_EVENTS_NATS_CLUSTER_ID=forge-cluster
FORGE_EVENTS_NATS_CLIENT_ID=forge-events
# Redis Broker Configuration
FORGE_EVENTS_REDIS_ADDR=localhost:6379
FORGE_EVENTS_REDIS_PASSWORD=""
FORGE_EVENTS_REDIS_DB=0
# Metrics Configuration
FORGE_EVENTS_METRICS_ENABLED=true
FORGE_EVENTS_METRICS_PUBLISH_INTERVAL=30sProgrammatic Configuration
package main
import (
"time"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/events"
)
func main() {
app := forge.NewApp(forge.AppConfig{
Name: "my-app",
Version: "1.0.0",
})
// Configure events extension
config := events.Config{
Bus: events.BusConfig{
DefaultBroker: "nats",
MaxRetries: 3,
RetryDelay: time.Second * 5,
EnableMetrics: true,
EnableTracing: true,
BufferSize: 1000,
WorkerCount: 10,
ProcessingTimeout: time.Second * 30,
},
Store: events.StoreConfig{
Type: "postgres",
Database: "main",
Table: "events",
},
Brokers: []events.BrokerConfig{
{
Name: "nats",
Type: "nats",
Enabled: true,
Priority: 1,
Config: map[string]interface{}{
"url": "nats://localhost:4222",
"cluster_id": "forge-cluster",
"client_id": "forge-events",
},
},
},
Metrics: events.MetricsConfig{
Enabled: true,
PublishInterval: time.Second * 30,
EnablePerType: true,
EnablePerHandler: true,
},
}
// Register extension
ext := events.NewExtensionWithConfig(config)
app.RegisterExtension(ext)
app.Start()
}Usage Examples
Basic Event Publishing
package main
import (
"context"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/events/core"
)
func publishUserEvent(app forge.App) error {
// Get event bus from DI container
eventBus := forge.Must[core.EventBus](app.Container(), "eventBus")
// Create event
event := core.NewEvent("user.created", "user-123", map[string]interface{}{
"name": "John Doe",
"email": "john@example.com",
"role": "user",
}).WithCorrelationID("req-456")
// Publish event
ctx := context.Background()
return eventBus.Publish(ctx, event)
}Event Subscription & Handling
package main
import (
"context"
"log"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/events/core"
)
func setupEventHandlers(app forge.App) error {
eventBus := forge.Must[core.EventBus](app.Container(), "eventBus")
// Simple handler function
userHandler := core.EventHandlerFunc(func(ctx context.Context, event *core.Event) error {
log.Printf("User event received: %s for %s", event.Type, event.AggregateID)
// Process the event
switch event.Type {
case "user.created":
return handleUserCreated(ctx, event)
case "user.updated":
return handleUserUpdated(ctx, event)
case "user.deleted":
return handleUserDeleted(ctx, event)
}
return nil
})
// Subscribe to user events
if err := eventBus.Subscribe("user.created", userHandler); err != nil {
return err
}
if err := eventBus.Subscribe("user.updated", userHandler); err != nil {
return err
}
if err := eventBus.Subscribe("user.deleted", userHandler); err != nil {
return err
}
return nil
}
func handleUserCreated(ctx context.Context, event *core.Event) error {
// Send welcome email
// Update analytics
// Create user profile
return nil
}
func handleUserUpdated(ctx context.Context, event *core.Event) error {
// Update search index
// Invalidate cache
return nil
}
func handleUserDeleted(ctx context.Context, event *core.Event) error {
// Clean up user data
// Send goodbye email
return nil
}Typed Event Handlers
package main
import (
"context"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/events/core"
)
func setupTypedHandlers(app forge.App) error {
eventBus := forge.Must[core.EventBus](app.Container(), "eventBus")
// Create typed handler for order events
orderHandler := core.NewTypedEventHandler(
"order-processor",
[]string{"order.created", "order.paid", "order.shipped", "order.delivered"},
func(ctx context.Context, event *core.Event) error {
switch event.Type {
case "order.created":
return processNewOrder(ctx, event)
case "order.paid":
return processPayment(ctx, event)
case "order.shipped":
return updateShipping(ctx, event)
case "order.delivered":
return completeOrder(ctx, event)
}
return nil
},
)
// Subscribe to all order events
for _, eventType := range []string{"order.created", "order.paid", "order.shipped", "order.delivered"} {
if err := eventBus.Subscribe(eventType, orderHandler); err != nil {
return err
}
}
return nil
}Event Sourcing
package main
import (
"context"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/events/core"
)
// Order aggregate
type Order struct {
ID string
Status string
Total float64
Items []OrderItem
Version int
}
type OrderItem struct {
ProductID string
Quantity int
Price float64
}
func createOrderWithEvents(app forge.App, orderID string, items []OrderItem) error {
eventStore := forge.Must[core.EventStore](app.Container(), "eventStore")
eventBus := forge.Must[core.EventBus](app.Container(), "eventBus")
ctx := context.Background()
// Calculate total
var total float64
for _, item := range items {
total += item.Price * float64(item.Quantity)
}
// Create events sequence
events := []*core.Event{
core.NewEvent("order.created", orderID, map[string]interface{}{
"status": "pending",
"total": total,
"items": items,
}).WithVersion(1),
core.NewEvent("order.validated", orderID, map[string]interface{}{
"validation_result": "passed",
}).WithVersion(2),
}
// Save events to store
for _, event := range events {
if err := eventStore.SaveEvent(ctx, event); err != nil {
return err
}
// Publish event to bus
if err := eventBus.Publish(ctx, event); err != nil {
return err
}
}
return nil
}
func reconstructOrderFromEvents(app forge.App, orderID string) (*Order, error) {
eventStore := forge.Must[core.EventStore](app.Container(), "eventStore")
ctx := context.Background()
// Get all events for the order
events, err := eventStore.GetEventsByAggregate(ctx, orderID, 0)
if err != nil {
return nil, err
}
// Reconstruct order state
order := &Order{ID: orderID}
for _, event := range events {
switch event.Type {
case "order.created":
data := event.Data.(map[string]interface{})
order.Status = data["status"].(string)
order.Total = data["total"].(float64)
// Reconstruct items...
case "order.paid":
order.Status = "paid"
case "order.shipped":
order.Status = "shipped"
case "order.delivered":
order.Status = "delivered"
}
order.Version = event.Version
}
return order, nil
}Multi-Broker Configuration
package main
import (
"context"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/events/core"
)
func publishToSpecificBroker(app forge.App) error {
eventBus := forge.Must[core.EventBus](app.Container(), "eventBus")
ctx := context.Background()
// Create high-priority event
criticalEvent := core.NewEvent("system.alert", "sys-001", map[string]interface{}{
"level": "critical",
"message": "Database connection lost",
})
// Publish to specific broker (e.g., NATS for reliability)
if err := eventBus.PublishTo(ctx, "nats", criticalEvent); err != nil {
return err
}
// Create regular event
regularEvent := core.NewEvent("user.login", "user-123", map[string]interface{}{
"timestamp": "2024-01-15T10:30:00Z",
"ip": "192.168.1.100",
})
// Publish to default broker
return eventBus.Publish(ctx, regularEvent)
}Advanced Features
Event Correlation & Tracing
package main
import (
"context"
"github.com/xraph/forge/extensions/events/core"
)
func publishWithCorrelation(eventBus core.EventBus, correlationID string) error {
ctx := context.Background()
// Create event with correlation ID for request tracing
event := core.NewEvent("payment.processed", "payment-123", map[string]interface{}{
"amount": 100.00,
"currency": "USD",
"method": "credit_card",
}).WithCorrelationID(correlationID).WithSource("payment-service")
return eventBus.Publish(ctx, event)
}Event Filtering & Routing
package main
import (
"context"
"strings"
"github.com/xraph/forge/extensions/events/core"
)
func setupConditionalHandlers(eventBus core.EventBus) error {
// Handler that only processes high-value orders
highValueHandler := core.EventHandlerFunc(func(ctx context.Context, event *core.Event) error {
if event.Type == "order.created" {
data := event.Data.(map[string]interface{})
if total, ok := data["total"].(float64); ok && total > 1000.00 {
// Process high-value order
return processHighValueOrder(ctx, event)
}
}
return nil
})
// Handler for specific regions
regionHandler := core.EventHandlerFunc(func(ctx context.Context, event *core.Event) error {
if strings.HasPrefix(event.Type, "user.") {
data := event.Data.(map[string]interface{})
if region, ok := data["region"].(string); ok && region == "EU" {
// Process EU-specific logic
return processEUUser(ctx, event)
}
}
return nil
})
// Subscribe handlers
eventBus.Subscribe("order.created", highValueHandler)
eventBus.Subscribe("user.created", regionHandler)
eventBus.Subscribe("user.updated", regionHandler)
return nil
}Event Store Queries
package main
import (
"context"
"time"
"github.com/xraph/forge/extensions/events/core"
)
func queryEventStore(eventStore core.EventStore) error {
ctx := context.Background()
// Get events by type
userEvents, err := eventStore.GetEventsByType(ctx, "user.created", 100, 0)
if err != nil {
return err
}
// Get events since timestamp
since := time.Now().Add(-24 * time.Hour)
recentEvents, err := eventStore.GetEventsSince(ctx, since, 50, 0)
if err != nil {
return err
}
// Get events in time range
start := time.Now().Add(-7 * 24 * time.Hour)
end := time.Now()
weekEvents, err := eventStore.GetEventsInRange(ctx, start, end, 200, 0)
if err != nil {
return err
}
// Process events...
processEvents(userEvents, recentEvents, weekEvents)
return nil
}Metrics & Monitoring
package main
import (
"context"
"log"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/events/core"
)
func monitorEventBus(app forge.App) {
eventBus := forge.Must[core.EventBus](app.Container(), "eventBus")
// Get event bus statistics
stats := eventBus.GetStats()
log.Printf("Event Bus Stats:")
log.Printf(" Name: %v", stats["name"])
log.Printf(" Started: %v", stats["started"])
log.Printf(" Brokers: %v", stats["brokers_count"])
log.Printf(" Published: %v", stats["events_published"])
log.Printf(" Processed: %v", stats["events_processed"])
log.Printf(" Failed: %v", stats["events_failed"])
// Get broker-specific stats
brokers := eventBus.GetBrokers()
for name, broker := range brokers {
brokerStats := broker.GetStats()
log.Printf("Broker %s Stats: %+v", name, brokerStats)
}
}Best Practices
Event Design
- Use descriptive event names:
user.created,order.shipped,payment.failed - Include aggregate ID: Always specify which entity the event relates to
- Add correlation IDs: Enable request tracing across services
- Version your events: Support schema evolution with version numbers
- Keep events immutable: Never modify events after creation
Handler Implementation
- Idempotent handlers: Ensure handlers can be safely retried
- Error handling: Implement proper error handling and logging
- Timeout management: Set appropriate timeouts for long-running operations
- Resource cleanup: Always clean up resources in handlers
Performance Optimization
- Batch processing: Process multiple events together when possible
- Async processing: Use async handlers for non-critical operations
- Connection pooling: Configure appropriate connection pools
- Buffer sizing: Tune buffer sizes based on event volume
Security Considerations
- Event validation: Validate event data before processing
- Access control: Implement proper authorization for event publishing
- Sensitive data: Avoid including sensitive data in events
- Audit logging: Log all event operations for security auditing
Troubleshooting
Common Issues
Events Not Being Delivered
# Check event bus status
curl http://localhost:8080/health/events
# Verify broker connections
curl http://localhost:8080/metrics/events/brokers
# Check event store connectivity
curl http://localhost:8080/health/events/storeHigh Event Processing Latency
# Increase worker count
extensions:
events:
bus:
worker_count: 20
buffer_size: 2000
processing_timeout: "60s"Event Store Performance Issues
# Optimize event store configuration
extensions:
events:
store:
type: "postgres"
database: "events_db"
# Use dedicated database for eventsMemory Usage Issues
# Tune memory settings
extensions:
events:
bus:
buffer_size: 500 # Reduce buffer size
worker_count: 5 # Reduce workers
brokers:
- name: "memory"
config:
max_events: 10000 # Limit memory brokerDebugging
Enable Debug Logging
logging:
level: debug
loggers:
events: debug
events.bus: debug
events.store: debugMonitor Event Flow
// Add event middleware for debugging
handler := core.EventHandlerFunc(func(ctx context.Context, event *core.Event) error {
log.Printf("Processing event: %s [%s] at %v",
event.Type, event.ID, event.Timestamp)
start := time.Now()
err := originalHandler(ctx, event)
duration := time.Since(start)
if err != nil {
log.Printf("Event processing failed: %v (took %v)", err, duration)
} else {
log.Printf("Event processed successfully (took %v)", duration)
}
return err
})Next Steps
- Explore GraphQL Extension for API integration
- Learn about gRPC Extension for service communication
- Check out Streaming Extension for real-time data
- Review WebRTC Extension for peer-to-peer communication
- See Database Extension for event store backends
- Visit Metrics Extension for monitoring integration
How is this guide?
Last updated on