Features
Consensus extension capabilities
Raft Consensus
Full Raft implementation providing strong consistency guarantees across a cluster of nodes. The algorithm ensures that all nodes agree on the same sequence of state transitions, even in the presence of failures.
Core Guarantees
- Leader election -- at most one leader is active at any time.
- Log replication -- committed entries are durably replicated to a majority of nodes.
- Safety -- if any node has applied a log entry at a given index, no other node will ever apply a different entry for that index.
- Consistency -- all reads and writes go through the leader, ensuring linearizable semantics.
Leader Election
Automatic leader election with configurable timing parameters:
consensus.NewExtension(
consensus.WithHeartbeatInterval(150 * time.Millisecond),
consensus.WithElectionTimeoutMin(300 * time.Millisecond),
consensus.WithElectionTimeoutMax(500 * time.Millisecond),
)Query the current leadership state:
svc := consensus.MustGet(app.Container())
if svc.IsLeader() {
// This node is the leader
}
leaderID := svc.GetLeader() // Node ID of the current leader
role := svc.GetRole() // Leader, Follower, or Candidate
term := svc.GetTerm() // Current Raft term numberPluggable State Machine
Implement the StateMachine interface to define your application's replicated state:
type StateMachine interface {
Apply(entry LogEntry) error // Apply a committed log entry
Snapshot() (*Snapshot, error) // Create a state snapshot
Restore(snapshot *Snapshot) error // Restore from a snapshot
Query(query any) (any, error) // Read from the state machine
}Example implementation for a replicated key-value store:
type KVStore struct {
mu sync.RWMutex
data map[string]string
}
func (s *KVStore) Apply(entry consensus.LogEntry) error {
var cmd struct {
Op string `json:"op"`
Key string `json:"key"`
Value string `json:"value"`
}
json.Unmarshal(entry.Data, &cmd)
s.mu.Lock()
defer s.mu.Unlock()
switch cmd.Op {
case "set":
s.data[cmd.Key] = cmd.Value
case "delete":
delete(s.data, cmd.Key)
}
return nil
}
func (s *KVStore) Query(query any) (any, error) {
key := query.(string)
s.mu.RLock()
defer s.mu.RUnlock()
return s.data[key], nil
}
func (s *KVStore) Snapshot() (*consensus.Snapshot, error) {
s.mu.RLock()
defer s.mu.RUnlock()
data, _ := json.Marshal(s.data)
return &consensus.Snapshot{Data: data}, nil
}
func (s *KVStore) Restore(snapshot *consensus.Snapshot) error {
s.mu.Lock()
defer s.mu.Unlock()
return json.Unmarshal(snapshot.Data, &s.data)
}Apply and Read
Write operations go through the Raft log for consensus; read operations query the state machine directly:
// Write: replicate a command through the Raft log
err := svc.Apply(ctx, consensus.Command{
Type: "set",
Data: []byte(`{"op":"set","key":"config","value":"v2"}`),
})
// Returns only after the command is committed to a majority
// Read: query the state machine (consistent read through leader)
result, err := svc.Read(ctx, "config")Apply blocks until the command is committed to a majority of nodes. If the current node is not the leader, it returns ErrNotLeader.
Snapshots
Periodic log compaction via snapshots prevents unbounded log growth:
consensus.NewExtension(
consensus.WithSnapshotInterval(5 * time.Minute),
consensus.WithSnapshotThreshold(1000), // Snapshot after 1000 log entries
)- The state machine's
Snapshot()is called to capture the current state. - Old log entries before the snapshot are discarded.
- New nodes joining the cluster catch up from the latest snapshot instead of replaying the full log.
- Manual snapshot trigger:
svc.Snapshot(ctx).
Cluster Management
Add and remove nodes dynamically without cluster downtime:
// Add a new node to the cluster
err := svc.AddNode(ctx, "node-4", "10.0.0.4", 7000)
// Remove a node from the cluster
err := svc.RemoveNode(ctx, "node-3")
// Transfer leadership to another node
err := svc.TransferLeadership(ctx, "node-2")
// Current leader voluntarily steps down
err := svc.StepDown(ctx)
// Get cluster information
info := svc.GetClusterInfo()
// info.Nodes, info.LeaderID, info.Term, info.CommitIndexRole Tracking
Each node reports its current role: Leader, Follower, or Candidate:
role := svc.GetRole()
switch role {
case consensus.RoleLeader:
// Accept writes, serve consistent reads
case consensus.RoleFollower:
// Forward writes to leader, serve stale reads
case consensus.RoleCandidate:
// Election in progress, reject requests
}Leadership Checker
Guard operations that should only run on the leader:
// Only proceed if this node is the leader
if !svc.IsLeader() {
return consensus.ErrNotLeader
}
// Check cluster health
info := svc.GetClusterInfo()
if info.HealthyNodes < (info.ClusterSize/2 + 1) {
return consensus.ErrNoQuorum
}Transport
Inter-node communication via pluggable transports:
type Transport interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
Send(ctx context.Context, target string, message any) error
Receive() <-chan Message
AddPeer(nodeID, address string, port int) error
RemovePeer(nodeID string) error
GetAddress() string
}Available transports:
- gRPC -- default transport using gRPC for RPC calls, log entries, and vote requests.
- TCP -- lightweight TCP transport for simpler deployments.
Storage Backends
Persistent storage for Raft logs, metadata, and snapshots:
type Storage interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
Set(key, value []byte) error
Get(key []byte) ([]byte, error)
Delete(key []byte) error
Batch(ops []BatchOp) error
GetRange(start, end []byte) ([]KeyValue, error)
Close() error
}Available backends:
- BadgerDB -- embedded key-value store, recommended for production.
- BoltDB -- B+tree based embedded store.
- Pebble -- LSM-tree based store (RocksDB-compatible).
- PostgreSQL -- SQL-based storage for environments where embedded DBs aren't suitable.
Discovery Integration
Cluster formation via static configuration or dynamic service discovery:
// Static peers
consensus.WithPeers([]consensus.PeerConfig{
{ID: "node-1", Address: "10.0.0.1", Port: 7000},
{ID: "node-2", Address: "10.0.0.2", Port: 7000},
{ID: "node-3", Address: "10.0.0.3", Port: 7000},
})
// Dynamic discovery
consensus.WithDiscovery("consul")The discovery interface supports watching for node changes to dynamically adjust the cluster:
type Discovery interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
Register(ctx context.Context, node NodeInfo) error
Unregister(ctx context.Context) error
GetNodes(ctx context.Context) ([]NodeInfo, error)
Watch(ctx context.Context) (<-chan NodeChangeEvent, error)
}Statistics
Monitor consensus performance and health:
stats := svc.GetStats()
// stats.Term, stats.CommitIndex, stats.LastApplied
// stats.LogSize, stats.SnapshotIndex
// stats.AppliedOps, stats.FailedOps
// stats.LeaderElections, stats.AvgApplyLatencyAdmin API
HTTP endpoints for cluster management and monitoring (configurable prefix):
| Endpoint | Description |
|---|---|
GET /health | Node health check |
GET /health/detailed | Detailed health with cluster info |
GET /status | Current node status (role, term, leader) |
GET /metrics | Consensus metrics |
GET /leader | Current leader info |
GET /nodes | List all cluster nodes |
POST /leadership/transfer | Transfer leadership to another node |
POST /leadership/stepdown | Current leader steps down |
POST /snapshot | Trigger a manual snapshot |
POST /nodes/add | Add a node to the cluster |
POST /nodes/remove | Remove a node from the cluster |
POST /apply | Apply a command through Raft |
POST /read | Read from the state machine |
Sentinel Errors
| Error | Meaning |
|---|---|
ErrNotLeader | Current node is not the leader |
ErrNoLeader | No leader has been elected |
ErrNotStarted | Consensus has not started |
ErrAlreadyStarted | Already running |
ErrNoQuorum | Not enough nodes for quorum |
ErrElectionTimeout | Election timed out |
ErrSnapshotFailed | Snapshot creation failed |
ErrLogInconsistent | Log entry mismatch |
ErrStaleTerm | Received stale term |
ErrNodeNotFound | Node ID not in cluster |
ErrPeerExists | Peer already in cluster |
ErrInsufficientPeers | Not enough peers to form cluster |
ErrStorageUnavailable | Storage backend unavailable |
Error Helpers
| Function | Description |
|---|---|
IsNotLeaderError(err) | Check if error is not-leader |
IsNoLeaderError(err) | Check if no leader elected |
IsNoQuorumError(err) | Check if quorum lost |
IsRetryable(err) | Check if error is transient and can be retried |
IsFatal(err) | Check if error is unrecoverable |
How is this guide?