Events

Features

Events extension capabilities and architecture

Pub/Sub Event Bus

Publish events to named topics and subscribe handlers. The bus supports multiple concurrent brokers with priority-based selection and per-publish broker targeting:

bus := events.MustGetEventBus(app.Container())

// Subscribe to a topic
bus.Subscribe("order.*", func(ctx context.Context, event *core.Event) error {
    fmt.Printf("Received: %s\n", event.Type)
    return nil
})

// Publish to default broker
bus.Publish(ctx, event)

// Publish to a specific broker
bus.PublishTo(ctx, "nats", event)

The bus processes subscribed events through a pool of workers with configurable WorkerCount (default: 4) and BufferSize (default: 1000). Workers consume from a buffered channel, providing backpressure when the buffer fills.

Pluggable Message Brokers

All brokers implement the MessageBroker interface:

BrokerImplementationTransport
memoryMemoryBrokerIn-process channels
natsNATSBrokerNATS server
redisRedisBrokerRedis Pub/Sub

Register additional brokers at runtime:

bus.RegisterBroker(myCustomBroker)
bus.SetDefaultBroker("nats")

The memory broker is always available. NATS and Redis brokers are created from BrokerConfig entries in config and support health checks, connection stats, and graceful shutdown.

Append-Only Event Store

Persist domain events with versioning, aggregate grouping, and time-range queries:

store := events.MustGetEventStore(app.Container())

// Save single or batch
store.SaveEvent(ctx, event)
store.SaveEvents(ctx, []*core.Event{event1, event2})

// Query by aggregate
events, _ := store.GetEventsByAggregate(ctx, "order-123")

// Query by type
events, _ := store.GetEventsByType(ctx, "order.placed")

// Time-range query
events, _ := store.GetEventsSince(ctx, time.Now().Add(-24*time.Hour))
events, _ := store.GetEventsInRange(ctx, start, end)

// Count events
total, _ := store.GetEventCount(ctx)
byType, _ := store.GetEventCountByType(ctx, "order.placed")

Store backends:

BackendImplementationStatus
MemoryMemoryEventStoreFully implemented, wired in EventService
PostgreSQLPostgresEventStoreImplemented, not yet wired through config
MongoDBMongoEventStoreImplemented, not yet wired through config

Snapshots

Create and retrieve aggregate snapshots to avoid replaying full event histories:

// Create a snapshot at version 50
snapshot := core.NewSnapshot("order-123", "order", 50, currentState).
    WithMetadata(map[string]any{"reason": "periodic"})

store.CreateSnapshot(ctx, snapshot)

// Retrieve latest snapshot
snap, _ := store.GetSnapshot(ctx, "order-123")
// Then replay only events after snap.Version

Event Streaming and Projections

Subscribe to live event streams with configurable start position, batch size, and poll interval. Define read-model projections that process events and maintain derived state:

// ProjectionManager handles registration, rebuild, and lifecycle
type EventProjection interface {
    Name() string
    HandleEvent(ctx context.Context, event *core.Event) error
    Reset(ctx context.Context) error
}

Transactional Writes

TransactionalEventStore supports atomic event and snapshot operations:

type TransactionalEventStore interface {
    EventStore
    BeginTransaction(ctx context.Context) (EventStoreTransaction, error)
}

type EventStoreTransaction interface {
    SaveEvent(ctx context.Context, event *core.Event) error
    SaveEvents(ctx context.Context, events []*core.Event) error
    CreateSnapshot(ctx context.Context, snapshot *Snapshot) error
    Commit() error
    Rollback() error
}

Handler Registry

Register handlers by event type, aggregate, or reflection with middleware chains:

Typed Event Handler

handler := core.NewTypedEventHandler("audit-logger", "user.*",
    func(ctx context.Context, event *core.Event) error {
        log.Printf("[AUDIT] %s: %v", event.Type, event.Data)
        return nil
    },
)

Domain Event Handler

handler := core.NewDomainEventHandler("order-handler",
    []string{"order.placed", "order.shipped", "order.delivered"},
    handleOrderEvent,
)

Reflection Event Handler

handler := core.NewReflectionEventHandler("payment-handler", paymentService)
// Automatically discovers Handle* methods on paymentService

Middleware Pipeline

Chain middleware on handlers for cross-cutting concerns:

handler.WithMiddleware(core.LoggingMiddleware(logger))
handler.WithMiddleware(core.MetricsMiddleware(metrics))
handler.WithMiddleware(core.ValidationMiddleware(validator))

Built-in middleware:

MiddlewarePurpose
LoggingMiddlewareLogs event type, handler name, and processing duration
MetricsMiddlewareRecords event processing latency and error counters
ValidationMiddlewareValidates event payloads against registered schemas

Retry Policies

Configure per-handler retry with exponential backoff:

policy := core.NewRetryPolicy(3, 100*time.Millisecond).
    WithMaxDelay(5*time.Second).
    WithBackoffFactor(2.0).
    WithRetryableErrors(ErrTemporary)

handler.WithRetryPolicy(policy)

The policy calculates delay with CalculateDelay() and checks ShouldRetry() to decide whether to retry based on error type and attempt count.

Rich Event Model

Events carry comprehensive metadata for tracing, versioning, and correlation:

event := core.NewEvent("order.placed", orderData).
    WithVersion(1).
    WithSource("order-service").
    WithCorrelationID("req-abc-123").
    WithCausationID("cmd-xyz-789").
    WithMetadata("tenant", "acme-corp")

// Access metadata
tenant := event.GetMetadataString("tenant")

// Clone, validate, serialize
clone := event.Clone()
err := event.Validate()
data, _ := event.MarshalJSON()

Event Filtering

Filter events with type-safe predicates:

// Built-in filters
typeFilter := core.TypeFilter("order.*")
aggregateFilter := core.AggregateFilter("order-123")
sourceFilter := core.SourceFilter("order-service")

// Combine filters
combined := core.CombineFilters(typeFilter, sourceFilter)

// Use with EventCollection
collection := core.NewEventCollection(allEvents)
filtered := collection.Filter(combined)
sorted := collection.SortByTimestamp()

Schema Validation

The extension includes an internal schema registry for validating event payloads:

  • JSONSchemaValidator -- validate against JSON Schema definitions
  • StructValidator -- validate against Go struct tags
  • RegexValidator -- validate string fields against regex patterns
  • SchemaRegistry -- versioned schema store with evolution tracking, compatibility checks, and caching

Sentinel Errors

ErrorMeaning
ErrEventNotFoundEvent ID does not exist in the store
ErrSnapshotNotFoundSnapshot does not exist
ErrInvalidEventEvent failed validation
ErrInvalidSnapshotSnapshot failed validation
ErrBrokerNotFoundNamed broker is not registered
ErrBrokerAlreadyRegisteredBroker name collision
ErrHandlerNotFoundHandler name not in registry
ErrEventBusNotStartedBus operation before Start()
ErrEventStoreNotStartedStore operation before initialization

How is this guide?

On this page