Storage Backend

Low-level key-value storage abstraction for registry implementations

The storage layer provides a low-level key-value abstraction that registry implementations build upon. This separates the FARP protocol logic from the underlying storage mechanism.

Storage Interface

type StorageBackend interface {
    Put(ctx context.Context, key string, value []byte) error
    Get(ctx context.Context, key string) ([]byte, error)
    Delete(ctx context.Context, key string) error
    List(ctx context.Context, prefix string) ([]string, error)
    Watch(ctx context.Context, prefix string) (<-chan StorageEvent, error)
    Close() error
}

Each method maps to a common KV store operation. Implementations translate these into backend-specific calls:

BackendPutGetWatch
Consulkv.Put()kv.Get()kv.WatchTree()
etcdclient.Put()client.Get()client.Watch()
RedisSETGETSUBSCRIBE
KubernetesConfigMap/SecretConfigMap/SecretWatch API
In-memorymap[string][]bytemap[string][]byteChannels

Storage Events

type StorageEvent struct {
    Type  EventType // "added", "updated", "removed"
    Key   string    // Key that changed
    Value []byte    // Value (nil for deletes)
}

Storage Helper

The StorageHelper wraps a StorageBackend with JSON serialization, compression, and size limits:

helper := farp.NewStorageHelper(backend, compressionThreshold, maxSize)

JSON Operations

// Store a JSON-serializable value
err := helper.PutJSON(ctx, "service/user/manifest", manifest)

// Retrieve and deserialize
var manifest farp.SchemaManifest
err := helper.GetJSON(ctx, "service/user/manifest", &manifest)

The helper automatically:

  • Serializes/deserializes JSON
  • Checks size limits (returns ErrSchemaToLarge if exceeded)
  • Compresses data with gzip when above the compression threshold
  • Tries to read compressed (.gz) version first, then falls back to uncompressed

Manifest Storage

ManifestStorage provides high-level operations for storing and retrieving manifests — combining the storage helper with a consistent key naming scheme:

store := farp.NewManifestStorage(backend, "farp", 100*1024, 1024*1024)

Key Structure

Keys follow a hierarchical pattern:

{namespace}/services/{service_name}/instances/{instance_id}/manifest

For example:

farp/services/user-service/instances/abc123/manifest

Operations

// Store a manifest
err := store.Put(ctx, manifest)

// Retrieve a manifest
manifest, err := store.Get(ctx, "user-service", "abc123")

// Delete a manifest
err := store.Delete(ctx, "user-service", "abc123")

// List all manifests for a service
manifests, err := store.List(ctx, "user-service")

Schema Storage

// Store a schema
err := store.PutSchema(ctx, "/schemas/user-service/v1/openapi", openapiSpec)

// Retrieve a schema
schema, err := store.GetSchema(ctx, "/schemas/user-service/v1/openapi")

// Delete a schema
err := store.DeleteSchema(ctx, "/schemas/user-service/v1/openapi")

Compression

Data is automatically compressed with gzip when it exceeds the compression threshold:

  • Compressed keys have a .gz suffix appended
  • On read, the helper tries .gz first, then falls back to uncompressed
  • Compression is transparent to the caller
// Create storage with 100KB compression threshold and 1MB max size
store := farp.NewManifestStorage(
    backend,
    "farp",
    100*1024,   // Compress schemas > 100KB
    1024*1024,  // Max schema size: 1MB
)

Compression is especially useful for large OpenAPI or AsyncAPI schemas that can easily exceed 100KB.


Implementing a Custom Backend

To implement a custom storage backend:

type MyBackend struct {
    // Your storage client
}

func (b *MyBackend) Put(ctx context.Context, key string, value []byte) error {
    // Store the value at the given key
    return nil
}

func (b *MyBackend) Get(ctx context.Context, key string) ([]byte, error) {
    // Retrieve the value
    // Return farp.ErrSchemaNotFound if key doesn't exist
    return nil, farp.ErrSchemaNotFound
}

func (b *MyBackend) Delete(ctx context.Context, key string) error {
    return nil
}

func (b *MyBackend) List(ctx context.Context, prefix string) ([]string, error) {
    // Return all keys matching the prefix
    return nil, nil
}

func (b *MyBackend) Watch(ctx context.Context, prefix string) (<-chan farp.StorageEvent, error) {
    ch := make(chan farp.StorageEvent)
    // Start watching for changes
    return ch, nil
}

func (b *MyBackend) Close() error {
    return nil
}

How is this guide?

On this page