Storage Backend
Low-level key-value storage abstraction for registry implementations
The storage layer provides a low-level key-value abstraction that registry implementations build upon. This separates the FARP protocol logic from the underlying storage mechanism.
Storage Interface
type StorageBackend interface {
Put(ctx context.Context, key string, value []byte) error
Get(ctx context.Context, key string) ([]byte, error)
Delete(ctx context.Context, key string) error
List(ctx context.Context, prefix string) ([]string, error)
Watch(ctx context.Context, prefix string) (<-chan StorageEvent, error)
Close() error
}Each method maps to a common KV store operation. Implementations translate these into backend-specific calls:
| Backend | Put | Get | Watch |
|---|---|---|---|
| Consul | kv.Put() | kv.Get() | kv.WatchTree() |
| etcd | client.Put() | client.Get() | client.Watch() |
| Redis | SET | GET | SUBSCRIBE |
| Kubernetes | ConfigMap/Secret | ConfigMap/Secret | Watch API |
| In-memory | map[string][]byte | map[string][]byte | Channels |
Storage Events
type StorageEvent struct {
Type EventType // "added", "updated", "removed"
Key string // Key that changed
Value []byte // Value (nil for deletes)
}Storage Helper
The StorageHelper wraps a StorageBackend with JSON serialization, compression, and size limits:
helper := farp.NewStorageHelper(backend, compressionThreshold, maxSize)JSON Operations
// Store a JSON-serializable value
err := helper.PutJSON(ctx, "service/user/manifest", manifest)
// Retrieve and deserialize
var manifest farp.SchemaManifest
err := helper.GetJSON(ctx, "service/user/manifest", &manifest)The helper automatically:
- Serializes/deserializes JSON
- Checks size limits (returns
ErrSchemaToLargeif exceeded) - Compresses data with gzip when above the compression threshold
- Tries to read compressed (
.gz) version first, then falls back to uncompressed
Manifest Storage
ManifestStorage provides high-level operations for storing and retrieving manifests — combining the storage helper with a consistent key naming scheme:
store := farp.NewManifestStorage(backend, "farp", 100*1024, 1024*1024)Key Structure
Keys follow a hierarchical pattern:
{namespace}/services/{service_name}/instances/{instance_id}/manifestFor example:
farp/services/user-service/instances/abc123/manifestOperations
// Store a manifest
err := store.Put(ctx, manifest)
// Retrieve a manifest
manifest, err := store.Get(ctx, "user-service", "abc123")
// Delete a manifest
err := store.Delete(ctx, "user-service", "abc123")
// List all manifests for a service
manifests, err := store.List(ctx, "user-service")Schema Storage
// Store a schema
err := store.PutSchema(ctx, "/schemas/user-service/v1/openapi", openapiSpec)
// Retrieve a schema
schema, err := store.GetSchema(ctx, "/schemas/user-service/v1/openapi")
// Delete a schema
err := store.DeleteSchema(ctx, "/schemas/user-service/v1/openapi")Compression
Data is automatically compressed with gzip when it exceeds the compression threshold:
- Compressed keys have a
.gzsuffix appended - On read, the helper tries
.gzfirst, then falls back to uncompressed - Compression is transparent to the caller
// Create storage with 100KB compression threshold and 1MB max size
store := farp.NewManifestStorage(
backend,
"farp",
100*1024, // Compress schemas > 100KB
1024*1024, // Max schema size: 1MB
)Compression is especially useful for large OpenAPI or AsyncAPI schemas that can easily exceed 100KB.
Implementing a Custom Backend
To implement a custom storage backend:
type MyBackend struct {
// Your storage client
}
func (b *MyBackend) Put(ctx context.Context, key string, value []byte) error {
// Store the value at the given key
return nil
}
func (b *MyBackend) Get(ctx context.Context, key string) ([]byte, error) {
// Retrieve the value
// Return farp.ErrSchemaNotFound if key doesn't exist
return nil, farp.ErrSchemaNotFound
}
func (b *MyBackend) Delete(ctx context.Context, key string) error {
return nil
}
func (b *MyBackend) List(ctx context.Context, prefix string) ([]string, error) {
// Return all keys matching the prefix
return nil, nil
}
func (b *MyBackend) Watch(ctx context.Context, prefix string) (<-chan farp.StorageEvent, error) {
ch := make(chan farp.StorageEvent)
// Start watching for changes
return ch, nil
}
func (b *MyBackend) Close() error {
return nil
}How is this guide?