Events

Events

Event-driven architecture with pub/sub, event sourcing, and multiple brokers

Overview

github.com/xraph/forge/extensions/events provides a complete event-driven architecture layer. It registers an EventService in the DI container that manages an event bus (pub/sub with pluggable brokers), an event store (append-only log with snapshots), and a handler registry with middleware support.

What It Registers

ServiceDI KeyType
Event serviceevents*EventService
Event buseventBuscore.EventBus
Event storeeventStorecore.EventStore
Handler registryeventHandlerRegistry*core.HandlerRegistry

All services are derived from the EventService singleton managed by Vessel.

Quick Start

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/events"
    "github.com/xraph/forge/extensions/events/core"
)

func main() {
    app := forge.NewApp(forge.AppConfig{Name: "my-app", Version: "1.0.0"})

    // Register with defaults (in-memory broker and store)
    app.RegisterExtension(events.NewExtension())

    ctx := context.Background()
    app.Start(ctx)
    defer app.Stop(ctx)

    // Get the event bus
    bus := events.MustGetEventBus(app.Container())

    // Subscribe to events
    bus.Subscribe("user.created", func(ctx context.Context, event *core.Event) error {
        fmt.Printf("User created: %s (aggregate: %s)\n", event.Type, event.AggregateID)
        return nil
    })

    // Publish an event
    event := core.NewEvent("user.created", map[string]any{
        "email": "alice@example.com",
        "name":  "Alice",
    }).WithSource("user-service").WithCorrelationID("req-123")

    bus.Publish(ctx, event)
}

Event Sourcing with the Event Store

store := events.MustGetEventStore(app.Container())

// Save events for an aggregate
event := core.NewEvent("order.placed", map[string]any{
    "items": []string{"item-1", "item-2"},
    "total": 59.99,
}).WithVersion(1)
event.AggregateID = "order-456"

store.SaveEvent(ctx, event)

// Retrieve events for an aggregate
history, _ := store.GetEventsByAggregate(ctx, "order-456")
for _, e := range history {
    fmt.Printf("  v%d: %s\n", e.Version, e.Type)
}

// Create a snapshot to avoid replaying full history
snapshot := core.NewSnapshot("order-456", "order", 10, orderState)
store.CreateSnapshot(ctx, snapshot)

Typed Event Handlers with Middleware

registry := events.MustGetHandlerRegistry(app.Container())

// Create a typed handler
handler := core.NewTypedEventHandler("audit-logger", "user.*",
    func(ctx context.Context, event *core.Event) error {
        fmt.Printf("[AUDIT] %s: %v\n", event.Type, event.Data)
        return nil
    },
)

// Add middleware chain
handler.WithMiddleware(core.LoggingMiddleware(logger))
handler.WithMiddleware(core.MetricsMiddleware(metrics))
handler.WithRetryPolicy(core.NewRetryPolicy(3, 100*time.Millisecond))

registry.Register("user.*", handler)

Using Events in Your Services

Inject *events.EventService for automatic DI resolution:

type OrderService struct {
    events *events.EventService
    logger forge.Logger
}

func NewOrderService(es *events.EventService, logger forge.Logger) *OrderService {
    return &OrderService{events: es, logger: logger}
}

func (s *OrderService) PlaceOrder(ctx context.Context, order Order) error {
    // Business logic...
    
    event := core.NewEvent("order.placed", order).
        WithSource("order-service").
        WithCorrelationID(ctx.Value("requestID").(string))
    event.AggregateID = order.ID

    // Publish through the bus
    return s.events.GetEventBus().Publish(ctx, event)
}

Register with Vessel:

forge.ProvideConstructor(app.Container(), NewOrderService)

Key Concepts

  • Event bus -- publish events to topics and subscribe handlers. The bus supports multiple message brokers (memory, NATS, Redis) with a configurable default and per-publish broker selection.
  • Event store -- persist events in an append-only store for event sourcing. Supports snapshots, streaming, projections, and transactional writes. Backends: memory, PostgreSQL, MongoDB.
  • Handler registry -- register typed event handlers, domain event handlers, and reflection-based handlers with middleware chains (logging, metrics, validation, retry).
  • Domain events -- rich event model with ID, Type, AggregateID, Version, CorrelationID, CausationID, and arbitrary metadata.
  • Worker pool -- the event bus processes events through a configurable worker pool with buffered channels.

Important Runtime Notes

  • Only the memory event store is wired in EventService. PostgreSQL and MongoDB stores exist in the codebase but are not yet connected through config.
  • The memory broker is always available. NATS and Redis brokers are created from BrokerConfig entries.
  • Event bus health, start, and stop are managed by EventService through Vessel lifecycle.

Detailed Pages

How is this guide?

On this page