Schema Registry

Interfaces for storing, retrieving, and watching manifests and schemas

The Schema Registry is the central interface for managing schema manifests and schemas across distributed services. FARP defines the interface — concrete implementations use backends like Consul, etcd, Kubernetes, Redis, or in-memory stores.

Registry Interface

type SchemaRegistry interface {
    // Manifest operations
    RegisterManifest(ctx context.Context, manifest *SchemaManifest) error
    GetManifest(ctx context.Context, instanceID string) (*SchemaManifest, error)
    UpdateManifest(ctx context.Context, manifest *SchemaManifest) error
    DeleteManifest(ctx context.Context, instanceID string) error
    ListManifests(ctx context.Context, serviceName string) ([]*SchemaManifest, error)

    // Schema operations
    PublishSchema(ctx context.Context, path string, schema any) error
    FetchSchema(ctx context.Context, path string) (any, error)
    DeleteSchema(ctx context.Context, path string) error

    // Watch operations
    WatchManifests(ctx context.Context, serviceName string, onChange ManifestChangeHandler) error
    WatchSchemas(ctx context.Context, path string, onChange SchemaChangeHandler) error

    // Lifecycle
    Close() error
    Health(ctx context.Context) error
}

FARP provides the interface, not the backend implementation. Your service framework (e.g., Forge) or infrastructure layer provides the concrete SchemaRegistry.

Manifest Operations

Register

Store a new manifest when a service starts:

manifest := farp.NewManifest("user-service", "v1.0.0", "instance-abc123")
manifest.Endpoints.Health = "/health"
// ... configure manifest ...

err := registry.RegisterManifest(ctx, manifest)

Get

Retrieve a manifest by instance ID:

manifest, err := registry.GetManifest(ctx, "instance-abc123")
if err != nil {
    // Handle farp.ErrManifestNotFound
}

Update

Update an existing manifest (e.g., after schema changes):

manifest.AddSchema(newSchemaDescriptor)
manifest.UpdateChecksum()
err := registry.UpdateManifest(ctx, manifest)

List

List all manifests for a service (or all services):

// All instances of a specific service
manifests, err := registry.ListManifests(ctx, "user-service")

// All services (pass empty string)
allManifests, err := registry.ListManifests(ctx, "")

Delete

Remove a manifest when an instance shuts down:

err := registry.DeleteManifest(ctx, "instance-abc123")

Schema Operations

Store and retrieve full schema documents:

// Publish a schema
err := registry.PublishSchema(ctx, "/schemas/user-service/v1/openapi", openapiSpec)

// Fetch a schema
schema, err := registry.FetchSchema(ctx, "/schemas/user-service/v1/openapi")

// Delete a schema
err := registry.DeleteSchema(ctx, "/schemas/user-service/v1/openapi")

Watch Operations

Subscribe to real-time changes for reactive gateway configuration:

Watch Manifests

err := registry.WatchManifests(ctx, "user-service", func(event *farp.ManifestEvent) {
    switch event.Type {
    case farp.EventTypeAdded:
        fmt.Printf("New instance: %s\n", event.Manifest.InstanceID)
    case farp.EventTypeUpdated:
        fmt.Printf("Updated: %s\n", event.Manifest.InstanceID)
    case farp.EventTypeRemoved:
        fmt.Printf("Removed: %s\n", event.Manifest.InstanceID)
    }
})

Watch Schemas

err := registry.WatchSchemas(ctx, "/schemas/user-service/v1/openapi", func(event *farp.SchemaEvent) {
    switch event.Type {
    case farp.EventTypeUpdated:
        fmt.Println("Schema updated, reconfiguring routes...")
    }
})

Event Types

EventDescription
addedA new resource was registered
updatedAn existing resource was modified
removedA resource was deregistered

Registry Configuration

type RegistryConfig struct {
    Backend              string         `json:"backend"`    // consul, etcd, kubernetes, etc.
    Namespace            string         `json:"namespace"`  // Key prefix
    BackendConfig        map[string]any `json:"backend_config"`
    MaxSchemaSize        int64          `json:"max_schema_size"`        // Default: 1MB
    CompressionThreshold int64          `json:"compression_threshold"` // Default: 100KB
    TTL                  int64          `json:"ttl"`                   // 0 = no expiry
}

Default Configuration

config := farp.DefaultRegistryConfig()
// Backend: "memory"
// Namespace: "farp"
// MaxSchemaSize: 1MB
// CompressionThreshold: 100KB
// TTL: 0 (no expiry)

Schema Cache

An optional caching layer for frequently accessed schemas:

type SchemaCache interface {
    Get(hash string) (any, bool)
    Set(hash string, schema any) error
    Delete(hash string) error
    Clear() error
    Size() int
}

Fetch and Publish Options

// Fetch with caching and validation
opts := farp.FetchOptions{
    UseCache:         true,
    ValidateChecksum: true,
    ExpectedHash:     "abc123...",
    Timeout:          30,
}

// Publish with compression
pubOpts := farp.PublishOptions{
    Compress:          true,
    TTL:               3600, // 1 hour
    OverwriteExisting: true,
}

How is this guide?

On this page