Events
Events
Event-driven architecture with pub/sub, event sourcing, and multiple brokers
Overview
github.com/xraph/forge/extensions/events provides a complete event-driven architecture layer.
It registers an EventService in the DI container that manages an event bus (pub/sub with pluggable
brokers), an event store (append-only log with snapshots), and a handler registry with middleware support.
What It Registers
| Service | DI Key | Type |
|---|---|---|
| Event service | events | *EventService |
| Event bus | eventBus | core.EventBus |
| Event store | eventStore | core.EventStore |
| Handler registry | eventHandlerRegistry | *core.HandlerRegistry |
All services are derived from the EventService singleton managed by Vessel.
Quick Start
package main
import (
"context"
"fmt"
"time"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/events"
"github.com/xraph/forge/extensions/events/core"
)
func main() {
app := forge.NewApp(forge.AppConfig{Name: "my-app", Version: "1.0.0"})
// Register with defaults (in-memory broker and store)
app.RegisterExtension(events.NewExtension())
ctx := context.Background()
app.Start(ctx)
defer app.Stop(ctx)
// Get the event bus
bus := events.MustGetEventBus(app.Container())
// Subscribe to events
bus.Subscribe("user.created", func(ctx context.Context, event *core.Event) error {
fmt.Printf("User created: %s (aggregate: %s)\n", event.Type, event.AggregateID)
return nil
})
// Publish an event
event := core.NewEvent("user.created", map[string]any{
"email": "alice@example.com",
"name": "Alice",
}).WithSource("user-service").WithCorrelationID("req-123")
bus.Publish(ctx, event)
}Event Sourcing with the Event Store
store := events.MustGetEventStore(app.Container())
// Save events for an aggregate
event := core.NewEvent("order.placed", map[string]any{
"items": []string{"item-1", "item-2"},
"total": 59.99,
}).WithVersion(1)
event.AggregateID = "order-456"
store.SaveEvent(ctx, event)
// Retrieve events for an aggregate
history, _ := store.GetEventsByAggregate(ctx, "order-456")
for _, e := range history {
fmt.Printf(" v%d: %s\n", e.Version, e.Type)
}
// Create a snapshot to avoid replaying full history
snapshot := core.NewSnapshot("order-456", "order", 10, orderState)
store.CreateSnapshot(ctx, snapshot)Typed Event Handlers with Middleware
registry := events.MustGetHandlerRegistry(app.Container())
// Create a typed handler
handler := core.NewTypedEventHandler("audit-logger", "user.*",
func(ctx context.Context, event *core.Event) error {
fmt.Printf("[AUDIT] %s: %v\n", event.Type, event.Data)
return nil
},
)
// Add middleware chain
handler.WithMiddleware(core.LoggingMiddleware(logger))
handler.WithMiddleware(core.MetricsMiddleware(metrics))
handler.WithRetryPolicy(core.NewRetryPolicy(3, 100*time.Millisecond))
registry.Register("user.*", handler)Using Events in Your Services
Inject *events.EventService for automatic DI resolution:
type OrderService struct {
events *events.EventService
logger forge.Logger
}
func NewOrderService(es *events.EventService, logger forge.Logger) *OrderService {
return &OrderService{events: es, logger: logger}
}
func (s *OrderService) PlaceOrder(ctx context.Context, order Order) error {
// Business logic...
event := core.NewEvent("order.placed", order).
WithSource("order-service").
WithCorrelationID(ctx.Value("requestID").(string))
event.AggregateID = order.ID
// Publish through the bus
return s.events.GetEventBus().Publish(ctx, event)
}Register with Vessel:
forge.ProvideConstructor(app.Container(), NewOrderService)Key Concepts
- Event bus -- publish events to topics and subscribe handlers. The bus supports multiple message brokers (memory, NATS, Redis) with a configurable default and per-publish broker selection.
- Event store -- persist events in an append-only store for event sourcing. Supports snapshots, streaming, projections, and transactional writes. Backends: memory, PostgreSQL, MongoDB.
- Handler registry -- register typed event handlers, domain event handlers, and reflection-based handlers with middleware chains (logging, metrics, validation, retry).
- Domain events -- rich event model with
ID,Type,AggregateID,Version,CorrelationID,CausationID, and arbitrary metadata. - Worker pool -- the event bus processes events through a configurable worker pool with buffered channels.
Important Runtime Notes
- Only the
memoryevent store is wired inEventService. PostgreSQL and MongoDB stores exist in the codebase but are not yet connected through config. - The
memorybroker is always available. NATS and Redis brokers are created fromBrokerConfigentries. - Event bus health, start, and stop are managed by
EventServicethrough Vessel lifecycle.
Detailed Pages
How is this guide?