Features
Events extension capabilities and architecture
Pub/Sub Event Bus
Publish events to named topics and subscribe handlers. The bus supports multiple concurrent brokers with priority-based selection and per-publish broker targeting:
bus := events.MustGetEventBus(app.Container())
// Subscribe to a topic
bus.Subscribe("order.*", func(ctx context.Context, event *core.Event) error {
fmt.Printf("Received: %s\n", event.Type)
return nil
})
// Publish to default broker
bus.Publish(ctx, event)
// Publish to a specific broker
bus.PublishTo(ctx, "nats", event)The bus processes subscribed events through a pool of workers with configurable WorkerCount (default: 4) and BufferSize (default: 1000). Workers consume from a buffered channel, providing backpressure when the buffer fills.
Pluggable Message Brokers
All brokers implement the MessageBroker interface:
| Broker | Implementation | Transport |
|---|---|---|
memory | MemoryBroker | In-process channels |
nats | NATSBroker | NATS server |
redis | RedisBroker | Redis Pub/Sub |
Register additional brokers at runtime:
bus.RegisterBroker(myCustomBroker)
bus.SetDefaultBroker("nats")The memory broker is always available. NATS and Redis brokers are created from BrokerConfig entries in config and support health checks, connection stats, and graceful shutdown.
Append-Only Event Store
Persist domain events with versioning, aggregate grouping, and time-range queries:
store := events.MustGetEventStore(app.Container())
// Save single or batch
store.SaveEvent(ctx, event)
store.SaveEvents(ctx, []*core.Event{event1, event2})
// Query by aggregate
events, _ := store.GetEventsByAggregate(ctx, "order-123")
// Query by type
events, _ := store.GetEventsByType(ctx, "order.placed")
// Time-range query
events, _ := store.GetEventsSince(ctx, time.Now().Add(-24*time.Hour))
events, _ := store.GetEventsInRange(ctx, start, end)
// Count events
total, _ := store.GetEventCount(ctx)
byType, _ := store.GetEventCountByType(ctx, "order.placed")Store backends:
| Backend | Implementation | Status |
|---|---|---|
| Memory | MemoryEventStore | Fully implemented, wired in EventService |
| PostgreSQL | PostgresEventStore | Implemented, not yet wired through config |
| MongoDB | MongoEventStore | Implemented, not yet wired through config |
Snapshots
Create and retrieve aggregate snapshots to avoid replaying full event histories:
// Create a snapshot at version 50
snapshot := core.NewSnapshot("order-123", "order", 50, currentState).
WithMetadata(map[string]any{"reason": "periodic"})
store.CreateSnapshot(ctx, snapshot)
// Retrieve latest snapshot
snap, _ := store.GetSnapshot(ctx, "order-123")
// Then replay only events after snap.VersionEvent Streaming and Projections
Subscribe to live event streams with configurable start position, batch size, and poll interval. Define read-model projections that process events and maintain derived state:
// ProjectionManager handles registration, rebuild, and lifecycle
type EventProjection interface {
Name() string
HandleEvent(ctx context.Context, event *core.Event) error
Reset(ctx context.Context) error
}Transactional Writes
TransactionalEventStore supports atomic event and snapshot operations:
type TransactionalEventStore interface {
EventStore
BeginTransaction(ctx context.Context) (EventStoreTransaction, error)
}
type EventStoreTransaction interface {
SaveEvent(ctx context.Context, event *core.Event) error
SaveEvents(ctx context.Context, events []*core.Event) error
CreateSnapshot(ctx context.Context, snapshot *Snapshot) error
Commit() error
Rollback() error
}Handler Registry
Register handlers by event type, aggregate, or reflection with middleware chains:
Typed Event Handler
handler := core.NewTypedEventHandler("audit-logger", "user.*",
func(ctx context.Context, event *core.Event) error {
log.Printf("[AUDIT] %s: %v", event.Type, event.Data)
return nil
},
)Domain Event Handler
handler := core.NewDomainEventHandler("order-handler",
[]string{"order.placed", "order.shipped", "order.delivered"},
handleOrderEvent,
)Reflection Event Handler
handler := core.NewReflectionEventHandler("payment-handler", paymentService)
// Automatically discovers Handle* methods on paymentServiceMiddleware Pipeline
Chain middleware on handlers for cross-cutting concerns:
handler.WithMiddleware(core.LoggingMiddleware(logger))
handler.WithMiddleware(core.MetricsMiddleware(metrics))
handler.WithMiddleware(core.ValidationMiddleware(validator))Built-in middleware:
| Middleware | Purpose |
|---|---|
LoggingMiddleware | Logs event type, handler name, and processing duration |
MetricsMiddleware | Records event processing latency and error counters |
ValidationMiddleware | Validates event payloads against registered schemas |
Retry Policies
Configure per-handler retry with exponential backoff:
policy := core.NewRetryPolicy(3, 100*time.Millisecond).
WithMaxDelay(5*time.Second).
WithBackoffFactor(2.0).
WithRetryableErrors(ErrTemporary)
handler.WithRetryPolicy(policy)The policy calculates delay with CalculateDelay() and checks ShouldRetry() to decide whether to retry based on error type and attempt count.
Rich Event Model
Events carry comprehensive metadata for tracing, versioning, and correlation:
event := core.NewEvent("order.placed", orderData).
WithVersion(1).
WithSource("order-service").
WithCorrelationID("req-abc-123").
WithCausationID("cmd-xyz-789").
WithMetadata("tenant", "acme-corp")
// Access metadata
tenant := event.GetMetadataString("tenant")
// Clone, validate, serialize
clone := event.Clone()
err := event.Validate()
data, _ := event.MarshalJSON()Event Filtering
Filter events with type-safe predicates:
// Built-in filters
typeFilter := core.TypeFilter("order.*")
aggregateFilter := core.AggregateFilter("order-123")
sourceFilter := core.SourceFilter("order-service")
// Combine filters
combined := core.CombineFilters(typeFilter, sourceFilter)
// Use with EventCollection
collection := core.NewEventCollection(allEvents)
filtered := collection.Filter(combined)
sorted := collection.SortByTimestamp()Schema Validation
The extension includes an internal schema registry for validating event payloads:
- JSONSchemaValidator -- validate against JSON Schema definitions
- StructValidator -- validate against Go struct tags
- RegexValidator -- validate string fields against regex patterns
- SchemaRegistry -- versioned schema store with evolution tracking, compatibility checks, and caching
Sentinel Errors
| Error | Meaning |
|---|---|
ErrEventNotFound | Event ID does not exist in the store |
ErrSnapshotNotFound | Snapshot does not exist |
ErrInvalidEvent | Event failed validation |
ErrInvalidSnapshot | Snapshot failed validation |
ErrBrokerNotFound | Named broker is not registered |
ErrBrokerAlreadyRegistered | Broker name collision |
ErrHandlerNotFound | Handler name not in registry |
ErrEventBusNotStarted | Bus operation before Start() |
ErrEventStoreNotStarted | Store operation before initialization |
How is this guide?