Consensus

Consensus

Distributed consensus with Raft for leader election and replicated state

Overview

github.com/xraph/forge/extensions/consensus provides distributed consensus using the Raft algorithm. It registers a ConsensusService in the DI container that manages leader election, log replication, snapshots, and a pluggable state machine for strongly consistent operations across a cluster.

What It Registers

ServiceDI KeyType
Consensus serviceconsensus*Service
Cluster managerconsensus:clusterClusterManager
Raft nodeconsensus:raftRaftNode
State machineconsensus:statemachineStateMachine
Leadership checkerconsensus:leadership*LeadershipChecker

The service is also available under the legacy alias consensus:service.

Dependencies

  • Optional: events extension -- when Events.Enabled is true, consensus emits leadership and cluster change events.

Quick Start

package main

import (
    "context"
    "fmt"

    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/consensus"
)

func main() {
    app := forge.NewApp(forge.AppConfig{Name: "node-1", Version: "1.0.0"})

    app.RegisterExtension(consensus.NewExtension(
        consensus.WithNodeID("node-1"),
        consensus.WithClusterID("my-cluster"),
        consensus.WithBindAddress("0.0.0.0", 7000),
        consensus.WithPeers([]consensus.PeerConfig{
            {ID: "node-1", Address: "10.0.0.1:7000"},
            {ID: "node-2", Address: "10.0.0.2:7000"},
            {ID: "node-3", Address: "10.0.0.3:7000"},
        }),
        consensus.WithStorageType("badger"),
        consensus.WithTransportType("grpc"),
    ))

    ctx := context.Background()
    app.Start(ctx)
    defer app.Stop(ctx)

    svc, _ := forge.InjectType[*consensus.Service](app.Container())

    // Check leadership
    if svc.IsLeader() {
        fmt.Println("This node is the leader")
    }

    leader, _ := svc.GetLeader()
    fmt.Printf("Current leader: %s\n", leader)

    // Apply a command (replicated via Raft)
    result, _ := svc.Apply(ctx, consensus.Command{
        Type:    "set",
        Payload: map[string]any{"key": "config:max-retries", "value": "5"},
    })
    fmt.Printf("Applied: %v\n", result)

    // Read from state machine (local, no replication needed)
    value, _ := svc.Read(ctx, consensus.Query{
        Type:    "get",
        Payload: map[string]any{"key": "config:max-retries"},
    })
    fmt.Printf("Read: %v\n", value)
}

Leadership-Guarded Operations

Use the LeadershipChecker for middleware-style leader and quorum checks:

checker, _ := forge.InjectType[*consensus.LeadershipChecker](app.Container())

// Only execute on the leader
if err := checker.RequireLeader(); err != nil {
    // Returns consensus.ErrNotLeader
    log.Printf("Not leader: %v", err)
}

// Require a quorum of healthy nodes
if err := checker.RequireQuorum(); err != nil {
    // Returns consensus.ErrNoQuorum
    log.Printf("No quorum: %v", err)
}

Cluster Management

// Add a new node (leader only)
svc.AddNode(ctx, "node-4", "10.0.0.4:7000")

// Remove a node
svc.RemoveNode(ctx, "node-4")

// Transfer leadership
svc.TransferLeadership(ctx, "node-2")

// Step down as leader
svc.StepDown(ctx)

// Get cluster info
info := svc.GetClusterInfo()
fmt.Printf("Nodes: %d, Healthy: %d, Leader: %s\n",
    info.TotalNodes, info.HealthyNodes, info.LeaderID)

// Take a snapshot
svc.Snapshot(ctx)

Key Concepts

  • Raft consensus -- implements the Raft algorithm for leader election, log replication, and commitment across a cluster.
  • Leader election -- automatic with configurable heartbeat (default 1s) and election timeout range (5-10s).
  • State machine -- pluggable StateMachine interface for applying committed commands and reading state.
  • Apply/Read -- Apply writes through the leader and replicates. Read reads from the local state machine.
  • Snapshots -- periodic snapshots for log compaction and fast catch-up of new nodes.
  • Transport -- gRPC or TCP transport for inter-node communication.
  • Discovery -- static, DNS, Consul, etcd, or Kubernetes-based peer discovery.
  • Storage -- BadgerDB, BoltDB, Pebble, or PostgreSQL for persistent Raft logs and snapshots.
  • Leadership checker -- middleware helper for leader-only and quorum-required operations.

Important Runtime Notes

  • NodeID is required and must be unique per node in the cluster.
  • The extension binds on BindAddr:BindPort (default 0.0.0.0:7000) for Raft communication.
  • Apply() returns ErrNotLeader if the current node is not the leader.
  • A minimum of 3 nodes is recommended for fault tolerance.

Detailed Pages

How is this guide?

On this page