Schema Registry
Interfaces for storing, retrieving, and watching manifests and schemas
The Schema Registry is the central interface for managing schema manifests and schemas across distributed services. FARP defines the interface — concrete implementations use backends like Consul, etcd, Kubernetes, Redis, or in-memory stores.
Registry Interface
type SchemaRegistry interface {
// Manifest operations
RegisterManifest(ctx context.Context, manifest *SchemaManifest) error
GetManifest(ctx context.Context, instanceID string) (*SchemaManifest, error)
UpdateManifest(ctx context.Context, manifest *SchemaManifest) error
DeleteManifest(ctx context.Context, instanceID string) error
ListManifests(ctx context.Context, serviceName string) ([]*SchemaManifest, error)
// Schema operations
PublishSchema(ctx context.Context, path string, schema any) error
FetchSchema(ctx context.Context, path string) (any, error)
DeleteSchema(ctx context.Context, path string) error
// Watch operations
WatchManifests(ctx context.Context, serviceName string, onChange ManifestChangeHandler) error
WatchSchemas(ctx context.Context, path string, onChange SchemaChangeHandler) error
// Lifecycle
Close() error
Health(ctx context.Context) error
}FARP provides the interface, not the backend implementation. Your service framework (e.g., Forge) or infrastructure layer provides the concrete SchemaRegistry.
Manifest Operations
Register
Store a new manifest when a service starts:
manifest := farp.NewManifest("user-service", "v1.0.0", "instance-abc123")
manifest.Endpoints.Health = "/health"
// ... configure manifest ...
err := registry.RegisterManifest(ctx, manifest)Get
Retrieve a manifest by instance ID:
manifest, err := registry.GetManifest(ctx, "instance-abc123")
if err != nil {
// Handle farp.ErrManifestNotFound
}Update
Update an existing manifest (e.g., after schema changes):
manifest.AddSchema(newSchemaDescriptor)
manifest.UpdateChecksum()
err := registry.UpdateManifest(ctx, manifest)List
List all manifests for a service (or all services):
// All instances of a specific service
manifests, err := registry.ListManifests(ctx, "user-service")
// All services (pass empty string)
allManifests, err := registry.ListManifests(ctx, "")Delete
Remove a manifest when an instance shuts down:
err := registry.DeleteManifest(ctx, "instance-abc123")Schema Operations
Store and retrieve full schema documents:
// Publish a schema
err := registry.PublishSchema(ctx, "/schemas/user-service/v1/openapi", openapiSpec)
// Fetch a schema
schema, err := registry.FetchSchema(ctx, "/schemas/user-service/v1/openapi")
// Delete a schema
err := registry.DeleteSchema(ctx, "/schemas/user-service/v1/openapi")Watch Operations
Subscribe to real-time changes for reactive gateway configuration:
Watch Manifests
err := registry.WatchManifests(ctx, "user-service", func(event *farp.ManifestEvent) {
switch event.Type {
case farp.EventTypeAdded:
fmt.Printf("New instance: %s\n", event.Manifest.InstanceID)
case farp.EventTypeUpdated:
fmt.Printf("Updated: %s\n", event.Manifest.InstanceID)
case farp.EventTypeRemoved:
fmt.Printf("Removed: %s\n", event.Manifest.InstanceID)
}
})Watch Schemas
err := registry.WatchSchemas(ctx, "/schemas/user-service/v1/openapi", func(event *farp.SchemaEvent) {
switch event.Type {
case farp.EventTypeUpdated:
fmt.Println("Schema updated, reconfiguring routes...")
}
})Event Types
| Event | Description |
|---|---|
added | A new resource was registered |
updated | An existing resource was modified |
removed | A resource was deregistered |
Registry Configuration
type RegistryConfig struct {
Backend string `json:"backend"` // consul, etcd, kubernetes, etc.
Namespace string `json:"namespace"` // Key prefix
BackendConfig map[string]any `json:"backend_config"`
MaxSchemaSize int64 `json:"max_schema_size"` // Default: 1MB
CompressionThreshold int64 `json:"compression_threshold"` // Default: 100KB
TTL int64 `json:"ttl"` // 0 = no expiry
}Default Configuration
config := farp.DefaultRegistryConfig()
// Backend: "memory"
// Namespace: "farp"
// MaxSchemaSize: 1MB
// CompressionThreshold: 100KB
// TTL: 0 (no expiry)Schema Cache
An optional caching layer for frequently accessed schemas:
type SchemaCache interface {
Get(hash string) (any, bool)
Set(hash string, schema any) error
Delete(hash string) error
Clear() error
Size() int
}Fetch and Publish Options
// Fetch with caching and validation
opts := farp.FetchOptions{
UseCache: true,
ValidateChecksum: true,
ExpectedHash: "abc123...",
Timeout: 30,
}
// Publish with compression
pubOpts := farp.PublishOptions{
Compress: true,
TTL: 3600, // 1 hour
OverwriteExisting: true,
}How is this guide?