Consensus
Consensus
Distributed consensus with Raft for leader election and replicated state
Overview
github.com/xraph/forge/extensions/consensus provides distributed consensus using the Raft algorithm.
It registers a ConsensusService in the DI container that manages leader election, log replication,
snapshots, and a pluggable state machine for strongly consistent operations across a cluster.
What It Registers
| Service | DI Key | Type |
|---|---|---|
| Consensus service | consensus | *Service |
| Cluster manager | consensus:cluster | ClusterManager |
| Raft node | consensus:raft | RaftNode |
| State machine | consensus:statemachine | StateMachine |
| Leadership checker | consensus:leadership | *LeadershipChecker |
The service is also available under the legacy alias consensus:service.
Dependencies
- Optional:
eventsextension -- whenEvents.Enabledis true, consensus emits leadership and cluster change events.
Quick Start
package main
import (
"context"
"fmt"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/consensus"
)
func main() {
app := forge.NewApp(forge.AppConfig{Name: "node-1", Version: "1.0.0"})
app.RegisterExtension(consensus.NewExtension(
consensus.WithNodeID("node-1"),
consensus.WithClusterID("my-cluster"),
consensus.WithBindAddress("0.0.0.0", 7000),
consensus.WithPeers([]consensus.PeerConfig{
{ID: "node-1", Address: "10.0.0.1:7000"},
{ID: "node-2", Address: "10.0.0.2:7000"},
{ID: "node-3", Address: "10.0.0.3:7000"},
}),
consensus.WithStorageType("badger"),
consensus.WithTransportType("grpc"),
))
ctx := context.Background()
app.Start(ctx)
defer app.Stop(ctx)
svc, _ := forge.InjectType[*consensus.Service](app.Container())
// Check leadership
if svc.IsLeader() {
fmt.Println("This node is the leader")
}
leader, _ := svc.GetLeader()
fmt.Printf("Current leader: %s\n", leader)
// Apply a command (replicated via Raft)
result, _ := svc.Apply(ctx, consensus.Command{
Type: "set",
Payload: map[string]any{"key": "config:max-retries", "value": "5"},
})
fmt.Printf("Applied: %v\n", result)
// Read from state machine (local, no replication needed)
value, _ := svc.Read(ctx, consensus.Query{
Type: "get",
Payload: map[string]any{"key": "config:max-retries"},
})
fmt.Printf("Read: %v\n", value)
}Leadership-Guarded Operations
Use the LeadershipChecker for middleware-style leader and quorum checks:
checker, _ := forge.InjectType[*consensus.LeadershipChecker](app.Container())
// Only execute on the leader
if err := checker.RequireLeader(); err != nil {
// Returns consensus.ErrNotLeader
log.Printf("Not leader: %v", err)
}
// Require a quorum of healthy nodes
if err := checker.RequireQuorum(); err != nil {
// Returns consensus.ErrNoQuorum
log.Printf("No quorum: %v", err)
}Cluster Management
// Add a new node (leader only)
svc.AddNode(ctx, "node-4", "10.0.0.4:7000")
// Remove a node
svc.RemoveNode(ctx, "node-4")
// Transfer leadership
svc.TransferLeadership(ctx, "node-2")
// Step down as leader
svc.StepDown(ctx)
// Get cluster info
info := svc.GetClusterInfo()
fmt.Printf("Nodes: %d, Healthy: %d, Leader: %s\n",
info.TotalNodes, info.HealthyNodes, info.LeaderID)
// Take a snapshot
svc.Snapshot(ctx)Key Concepts
- Raft consensus -- implements the Raft algorithm for leader election, log replication, and commitment across a cluster.
- Leader election -- automatic with configurable heartbeat (default 1s) and election timeout range (5-10s).
- State machine -- pluggable
StateMachineinterface for applying committed commands and reading state. - Apply/Read --
Applywrites through the leader and replicates.Readreads from the local state machine. - Snapshots -- periodic snapshots for log compaction and fast catch-up of new nodes.
- Transport -- gRPC or TCP transport for inter-node communication.
- Discovery -- static, DNS, Consul, etcd, or Kubernetes-based peer discovery.
- Storage -- BadgerDB, BoltDB, Pebble, or PostgreSQL for persistent Raft logs and snapshots.
- Leadership checker -- middleware helper for leader-only and quorum-required operations.
Important Runtime Notes
NodeIDis required and must be unique per node in the cluster.- The extension binds on
BindAddr:BindPort(default0.0.0.0:7000) for Raft communication. Apply()returnsErrNotLeaderif the current node is not the leader.- A minimum of 3 nodes is recommended for fault tolerance.
Detailed Pages
How is this guide?