Consensus

Features

Consensus extension capabilities

Raft Consensus

Full Raft implementation providing strong consistency guarantees across a cluster of nodes. The algorithm ensures that all nodes agree on the same sequence of state transitions, even in the presence of failures.

Core Guarantees

  • Leader election -- at most one leader is active at any time.
  • Log replication -- committed entries are durably replicated to a majority of nodes.
  • Safety -- if any node has applied a log entry at a given index, no other node will ever apply a different entry for that index.
  • Consistency -- all reads and writes go through the leader, ensuring linearizable semantics.

Leader Election

Automatic leader election with configurable timing parameters:

consensus.NewExtension(
    consensus.WithHeartbeatInterval(150 * time.Millisecond),
    consensus.WithElectionTimeoutMin(300 * time.Millisecond),
    consensus.WithElectionTimeoutMax(500 * time.Millisecond),
)

Query the current leadership state:

svc := consensus.MustGet(app.Container())

if svc.IsLeader() {
    // This node is the leader
}

leaderID := svc.GetLeader()  // Node ID of the current leader
role := svc.GetRole()         // Leader, Follower, or Candidate
term := svc.GetTerm()         // Current Raft term number

Pluggable State Machine

Implement the StateMachine interface to define your application's replicated state:

type StateMachine interface {
    Apply(entry LogEntry) error          // Apply a committed log entry
    Snapshot() (*Snapshot, error)         // Create a state snapshot
    Restore(snapshot *Snapshot) error     // Restore from a snapshot
    Query(query any) (any, error)         // Read from the state machine
}

Example implementation for a replicated key-value store:

type KVStore struct {
    mu   sync.RWMutex
    data map[string]string
}

func (s *KVStore) Apply(entry consensus.LogEntry) error {
    var cmd struct {
        Op    string `json:"op"`
        Key   string `json:"key"`
        Value string `json:"value"`
    }
    json.Unmarshal(entry.Data, &cmd)

    s.mu.Lock()
    defer s.mu.Unlock()

    switch cmd.Op {
    case "set":
        s.data[cmd.Key] = cmd.Value
    case "delete":
        delete(s.data, cmd.Key)
    }
    return nil
}

func (s *KVStore) Query(query any) (any, error) {
    key := query.(string)
    s.mu.RLock()
    defer s.mu.RUnlock()
    return s.data[key], nil
}

func (s *KVStore) Snapshot() (*consensus.Snapshot, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    data, _ := json.Marshal(s.data)
    return &consensus.Snapshot{Data: data}, nil
}

func (s *KVStore) Restore(snapshot *consensus.Snapshot) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    return json.Unmarshal(snapshot.Data, &s.data)
}

Apply and Read

Write operations go through the Raft log for consensus; read operations query the state machine directly:

// Write: replicate a command through the Raft log
err := svc.Apply(ctx, consensus.Command{
    Type: "set",
    Data: []byte(`{"op":"set","key":"config","value":"v2"}`),
})
// Returns only after the command is committed to a majority

// Read: query the state machine (consistent read through leader)
result, err := svc.Read(ctx, "config")

Apply blocks until the command is committed to a majority of nodes. If the current node is not the leader, it returns ErrNotLeader.

Snapshots

Periodic log compaction via snapshots prevents unbounded log growth:

consensus.NewExtension(
    consensus.WithSnapshotInterval(5 * time.Minute),
    consensus.WithSnapshotThreshold(1000), // Snapshot after 1000 log entries
)
  • The state machine's Snapshot() is called to capture the current state.
  • Old log entries before the snapshot are discarded.
  • New nodes joining the cluster catch up from the latest snapshot instead of replaying the full log.
  • Manual snapshot trigger: svc.Snapshot(ctx).

Cluster Management

Add and remove nodes dynamically without cluster downtime:

// Add a new node to the cluster
err := svc.AddNode(ctx, "node-4", "10.0.0.4", 7000)

// Remove a node from the cluster
err := svc.RemoveNode(ctx, "node-3")

// Transfer leadership to another node
err := svc.TransferLeadership(ctx, "node-2")

// Current leader voluntarily steps down
err := svc.StepDown(ctx)

// Get cluster information
info := svc.GetClusterInfo()
// info.Nodes, info.LeaderID, info.Term, info.CommitIndex

Role Tracking

Each node reports its current role: Leader, Follower, or Candidate:

role := svc.GetRole()

switch role {
case consensus.RoleLeader:
    // Accept writes, serve consistent reads
case consensus.RoleFollower:
    // Forward writes to leader, serve stale reads
case consensus.RoleCandidate:
    // Election in progress, reject requests
}

Leadership Checker

Guard operations that should only run on the leader:

// Only proceed if this node is the leader
if !svc.IsLeader() {
    return consensus.ErrNotLeader
}

// Check cluster health
info := svc.GetClusterInfo()
if info.HealthyNodes < (info.ClusterSize/2 + 1) {
    return consensus.ErrNoQuorum
}

Transport

Inter-node communication via pluggable transports:

type Transport interface {
    Start(ctx context.Context) error
    Stop(ctx context.Context) error
    Send(ctx context.Context, target string, message any) error
    Receive() <-chan Message
    AddPeer(nodeID, address string, port int) error
    RemovePeer(nodeID string) error
    GetAddress() string
}

Available transports:

  • gRPC -- default transport using gRPC for RPC calls, log entries, and vote requests.
  • TCP -- lightweight TCP transport for simpler deployments.

Storage Backends

Persistent storage for Raft logs, metadata, and snapshots:

type Storage interface {
    Start(ctx context.Context) error
    Stop(ctx context.Context) error
    Set(key, value []byte) error
    Get(key []byte) ([]byte, error)
    Delete(key []byte) error
    Batch(ops []BatchOp) error
    GetRange(start, end []byte) ([]KeyValue, error)
    Close() error
}

Available backends:

  • BadgerDB -- embedded key-value store, recommended for production.
  • BoltDB -- B+tree based embedded store.
  • Pebble -- LSM-tree based store (RocksDB-compatible).
  • PostgreSQL -- SQL-based storage for environments where embedded DBs aren't suitable.

Discovery Integration

Cluster formation via static configuration or dynamic service discovery:

// Static peers
consensus.WithPeers([]consensus.PeerConfig{
    {ID: "node-1", Address: "10.0.0.1", Port: 7000},
    {ID: "node-2", Address: "10.0.0.2", Port: 7000},
    {ID: "node-3", Address: "10.0.0.3", Port: 7000},
})

// Dynamic discovery
consensus.WithDiscovery("consul")

The discovery interface supports watching for node changes to dynamically adjust the cluster:

type Discovery interface {
    Start(ctx context.Context) error
    Stop(ctx context.Context) error
    Register(ctx context.Context, node NodeInfo) error
    Unregister(ctx context.Context) error
    GetNodes(ctx context.Context) ([]NodeInfo, error)
    Watch(ctx context.Context) (<-chan NodeChangeEvent, error)
}

Statistics

Monitor consensus performance and health:

stats := svc.GetStats()
// stats.Term, stats.CommitIndex, stats.LastApplied
// stats.LogSize, stats.SnapshotIndex
// stats.AppliedOps, stats.FailedOps
// stats.LeaderElections, stats.AvgApplyLatency

Admin API

HTTP endpoints for cluster management and monitoring (configurable prefix):

EndpointDescription
GET /healthNode health check
GET /health/detailedDetailed health with cluster info
GET /statusCurrent node status (role, term, leader)
GET /metricsConsensus metrics
GET /leaderCurrent leader info
GET /nodesList all cluster nodes
POST /leadership/transferTransfer leadership to another node
POST /leadership/stepdownCurrent leader steps down
POST /snapshotTrigger a manual snapshot
POST /nodes/addAdd a node to the cluster
POST /nodes/removeRemove a node from the cluster
POST /applyApply a command through Raft
POST /readRead from the state machine

Sentinel Errors

ErrorMeaning
ErrNotLeaderCurrent node is not the leader
ErrNoLeaderNo leader has been elected
ErrNotStartedConsensus has not started
ErrAlreadyStartedAlready running
ErrNoQuorumNot enough nodes for quorum
ErrElectionTimeoutElection timed out
ErrSnapshotFailedSnapshot creation failed
ErrLogInconsistentLog entry mismatch
ErrStaleTermReceived stale term
ErrNodeNotFoundNode ID not in cluster
ErrPeerExistsPeer already in cluster
ErrInsufficientPeersNot enough peers to form cluster
ErrStorageUnavailableStorage backend unavailable

Error Helpers

FunctionDescription
IsNotLeaderError(err)Check if error is not-leader
IsNoLeaderError(err)Check if no leader elected
IsNoQuorumError(err)Check if quorum lost
IsRetryable(err)Check if error is transient and can be retried
IsFatal(err)Check if error is unrecoverable

How is this guide?

On this page