Consensus Extension

Distributed consensus capabilities using the Raft algorithm, enabling high availability, leader election, and data replication across multiple nodes in a cluster

Consensus Extension

The Consensus extension provides distributed consensus capabilities using the Raft algorithm, enabling high availability, leader election, and data replication across multiple nodes in a cluster.

Features

Core Consensus

  • Raft Algorithm: Industry-standard consensus protocol for distributed systems
  • Leader Election: Automatic leader selection with configurable timeouts
  • Log Replication: Consistent data replication across cluster nodes
  • Snapshotting: Automatic log compaction and state snapshots
  • Dynamic Membership: Add/remove nodes without downtime

Transport & Discovery

  • Multiple Transports: gRPC and TCP with TLS/mTLS support
  • Service Discovery: Static, DNS, Consul, Kubernetes, etcd integration
  • Connection Management: Keep-alive, multiplexing, compression
  • Network Resilience: Automatic reconnection and failure detection

Storage Backends

  • BadgerDB: High-performance embedded key-value store
  • BoltDB: ACID-compliant embedded database
  • Pebble: RocksDB-inspired storage engine
  • PostgreSQL: External database backend support

Observability

  • Prometheus Metrics: Comprehensive cluster and node metrics
  • Health Checks: Configurable health monitoring
  • Structured Logging: Detailed operational logs
  • Distributed Tracing: Optional tracing support

Security

  • TLS/mTLS: Encrypted communication between nodes
  • Authentication: Configurable node authentication
  • Data Encryption: Optional at-rest encryption

Installation

Go Module

go get github.com/xraph/forge/extensions/consensus

Docker

docker pull xraph/forge-consensus:latest

Package Manager

# Using the Forge CLI
forge extension add consensus

Configuration

YAML Configuration

consensus:
  node_id: "node-1"
  cluster_id: "my-cluster"
  bind_addr: "0.0.0.0"
  bind_port: 7000
  
  # Initial cluster peers
  peers:
    - id: "node-1"
      address: "localhost"
      port: 7000
    - id: "node-2"
      address: "localhost"
      port: 7001
    - id: "node-3"
      address: "localhost"
      port: 7002
  
  # Raft configuration
  raft:
    heartbeat_interval: "1s"
    election_timeout_min: "5s"
    election_timeout_max: "10s"
    snapshot_interval: "30m"
    snapshot_threshold: 10000
    log_cache_size: 1024
    max_append_entries: 64
    trailing_logs: 10000
    pre_vote: true
    check_quorum: true
  
  # Transport configuration
  transport:
    type: "grpc"  # grpc, tcp
    max_message_size: 4194304  # 4MB
    timeout: "10s"
    keep_alive: true
    keep_alive_interval: "30s"
    enable_compression: true
    enable_multiplexing: true
  
  # Discovery configuration
  discovery:
    type: "static"  # static, dns, consul, etcd, kubernetes
    refresh_interval: "30s"
    enable_watch: true
  
  # Storage configuration
  storage:
    type: "badger"  # badger, boltdb, pebble, postgres
    path: "./data/consensus"
    sync_writes: true
    max_batch_size: 1000
    max_batch_delay: "10ms"
    
    # BadgerDB options
    badger_options:
      value_log_max_entries: 1000000
      mem_table_size: 67108864  # 64MB
      num_mem_tables: 5
      num_compactors: 4
  
  # Security configuration
  security:
    enable_tls: false
    enable_mtls: false
    cert_file: ""
    key_file: ""
    ca_file: ""
    enable_encryption: false
  
  # Health checks
  health:
    enabled: true
    check_interval: "10s"
    timeout: "5s"
    unhealthy_threshold: 3
    healthy_threshold: 2
  
  # Observability
  observability:
    metrics:
      enabled: true
      collection_interval: "15s"
      namespace: "forge_consensus"
      enable_detailed_metrics: true
    tracing:
      enabled: false
      service_name: "forge-consensus"
      sample_rate: 0.1
    logging:
      level: "info"
      enable_structured: true
      log_raft_details: false
  
  # Admin API
  admin_api:
    enabled: true
    path_prefix: "/consensus"
    enable_auth: false
  
  # Advanced settings
  advanced:
    enable_auto_snapshot: true
    enable_auto_compaction: true
    compaction_interval: "1h"
    max_memory_usage: 1073741824  # 1GB
    gc_interval: "5m"
    enable_read_index: true
    enable_leased_reads: true

Environment Variables

CONSENSUS_NODE_ID=node-1
CONSENSUS_CLUSTER_ID=my-cluster
CONSENSUS_BIND_ADDR=0.0.0.0
CONSENSUS_BIND_PORT=7000
CONSENSUS_STORAGE_PATH=./data/consensus
CONSENSUS_TRANSPORT_TYPE=grpc
CONSENSUS_DISCOVERY_TYPE=static
CONSENSUS_ENABLE_TLS=false
CONSENSUS_ENABLE_METRICS=true

Programmatic Configuration

package main

import (
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/consensus"
)

func main() {
    app := forge.NewApp(forge.AppConfig{
        Name: "consensus-app",
    })
    
    // Register consensus extension
    err := app.RegisterExtension(consensus.NewExtension(
        consensus.WithNodeID("node-1"),
        consensus.WithClusterID("my-cluster"),
        consensus.WithBindAddress("0.0.0.0", 7000),
        consensus.WithPeers([]consensus.PeerConfig{
            {ID: "node-1", Address: "localhost", Port: 7000},
            {ID: "node-2", Address: "localhost", Port: 7001},
            {ID: "node-3", Address: "localhost", Port: 7002},
        }),
        consensus.WithStoragePath("./data/node-1"),
        consensus.WithTransportType("grpc"),
        consensus.WithTLS("cert.pem", "key.pem"),
    ))
    if err != nil {
        panic(err)
    }
    
    app.Start(context.Background())
}

Usage Examples

Basic Consensus Service

package main

import (
    "context"
    "log"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/consensus"
)

func main() {
    app := forge.NewApp(forge.AppConfig{Name: "consensus-example"})
    
    // Register consensus extension
    app.RegisterExtension(consensus.NewExtension(
        consensus.WithNodeID("node-1"),
        consensus.WithClusterID("my-cluster"),
        consensus.WithBindAddress("0.0.0.0", 7000),
        consensus.WithStoragePath("./data/node-1"),
    ))
    
    ctx := context.Background()
    app.Start(ctx)
    
    // Get consensus service
    service, _ := forge.Resolve[*consensus.Service](app.Container(), "consensus")
    
    // Check if this node is the leader
    if service.IsLeader() {
        log.Println("This node is the leader")
        
        // Apply a command
        cmd := consensus.Command{
            Type: "set",
            Payload: map[string]interface{}{
                "key":   "user:123",
                "value": "John Doe",
            },
        }
        
        err := service.Apply(ctx, cmd)
        if err != nil {
            log.Printf("Failed to apply command: %v", err)
        }
    }
    
    // Read data (can be done on any node)
    result, err := service.Read(ctx, map[string]interface{}{
        "key": "user:123",
    })
    if err != nil {
        log.Printf("Failed to read: %v", err)
    } else {
        log.Printf("Read result: %v", result)
    }
}

Cluster Management

// Add a new node to the cluster
err := service.AddNode(ctx, "node-4", "192.168.1.4", 7000)
if err != nil {
    log.Printf("Failed to add node: %v", err)
}

// Remove a node from the cluster
err = service.RemoveNode(ctx, "node-4")
if err != nil {
    log.Printf("Failed to remove node: %v", err)
}

// Transfer leadership to another node
err = service.TransferLeadership(ctx, "node-2")
if err != nil {
    log.Printf("Failed to transfer leadership: %v", err)
}

// Get cluster information
clusterInfo := service.GetClusterInfo()
log.Printf("Cluster has %d nodes, leader: %s", 
    clusterInfo.TotalNodes, clusterInfo.Leader)

Health Monitoring

// Check service health
err := service.HealthCheck(ctx)
if err != nil {
    log.Printf("Health check failed: %v", err)
}

// Get detailed health status
healthStatus := service.GetHealthStatus(ctx)
log.Printf("Health: %t, Status: %s, Has Quorum: %t", 
    healthStatus.Healthy, healthStatus.Status, healthStatus.HasQuorum)

// Get consensus statistics
stats := service.GetStats()
log.Printf("Node: %s, Role: %s, Term: %d, Operations: %d", 
    stats.NodeID, stats.Role, stats.Term, stats.OperationsTotal)

Custom State Machine

type CustomStateMachine struct {
    data map[string]interface{}
    mu   sync.RWMutex
}

func (sm *CustomStateMachine) Apply(entry consensus.LogEntry) error {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    
    var cmd consensus.Command
    if err := json.Unmarshal(entry.Data, &cmd); err != nil {
        return err
    }
    
    switch cmd.Type {
    case "set":
        key := cmd.Payload["key"].(string)
        value := cmd.Payload["value"]
        sm.data[key] = value
    case "delete":
        key := cmd.Payload["key"].(string)
        delete(sm.data, key)
    }
    
    return nil
}

func (sm *CustomStateMachine) Query(query interface{}) (interface{}, error) {
    sm.mu.RLock()
    defer sm.mu.RUnlock()
    
    q := query.(map[string]interface{})
    key := q["key"].(string)
    
    return sm.data[key], nil
}

func (sm *CustomStateMachine) Snapshot() (*consensus.Snapshot, error) {
    sm.mu.RLock()
    defer sm.mu.RUnlock()
    
    data, err := json.Marshal(sm.data)
    if err != nil {
        return nil, err
    }
    
    return &consensus.Snapshot{
        Data:    data,
        Created: time.Now(),
    }, nil
}

func (sm *CustomStateMachine) Restore(snapshot *consensus.Snapshot) error {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    
    return json.Unmarshal(snapshot.Data, &sm.data)
}

Advanced Features

Distributed Deployment

# Node 1 configuration
consensus:
  node_id: "node-1"
  cluster_id: "prod-cluster"
  bind_addr: "10.0.1.10"
  bind_port: 7000
  
  peers:
    - id: "node-1"
      address: "10.0.1.10"
      port: 7000
    - id: "node-2"
      address: "10.0.1.11"
      port: 7000
    - id: "node-3"
      address: "10.0.1.12"
      port: 7000
  
  discovery:
    type: "consul"
    endpoints: ["consul.service.consul:8500"]
    service_name: "forge-consensus"
    namespace: "production"
  
  security:
    enable_mtls: true
    cert_file: "/etc/certs/node.crt"
    key_file: "/etc/certs/node.key"
    ca_file: "/etc/certs/ca.crt"

High Availability Setup

// Configure for high availability
config := consensus.Config{
    Raft: consensus.RaftConfig{
        HeartbeatInterval:  time.Second,
        ElectionTimeoutMin: 5 * time.Second,
        ElectionTimeoutMax: 10 * time.Second,
        PreVote:           true,
        CheckQuorum:       true,
    },
    Health: consensus.HealthConfig{
        Enabled:            true,
        CheckInterval:      10 * time.Second,
        UnhealthyThreshold: 3,
        HealthyThreshold:   2,
    },
    Resilience: consensus.ResilienceConfig{
        EnableRetry:             true,
        MaxRetries:              3,
        RetryDelay:              100 * time.Millisecond,
        EnableCircuitBreaker:    true,
        CircuitBreakerThreshold: 5,
        CircuitBreakerTimeout:   30 * time.Second,
    },
}

Performance Optimization

consensus:
  raft:
    log_cache_size: 2048
    max_append_entries: 128
    replication_batch_size: 200
    disable_pipeline: false
  
  transport:
    max_message_size: 8388608  # 8MB
    enable_compression: true
    compression_level: 6
    enable_multiplexing: true
    max_connections: 200
  
  storage:
    max_batch_size: 2000
    max_batch_delay: "5ms"
    badger_options:
      mem_table_size: 134217728  # 128MB
      num_mem_tables: 8
      num_compactors: 8
  
  advanced:
    max_memory_usage: 2147483648  # 2GB
    gc_interval: "2m"
    enable_read_index: true
    enable_leased_reads: true

API Endpoints

Health & Status

# Basic health check
GET /consensus/health

# Detailed health status
GET /consensus/health/detailed

# Node status and statistics
GET /consensus/status

# Prometheus metrics
GET /consensus/metrics

Cluster Management

# List all nodes
GET /consensus/cluster/nodes

# Get specific node
GET /consensus/cluster/nodes/{node_id}

# Add node to cluster
POST /consensus/cluster/nodes
{
  "node_id": "node-4",
  "address": "10.0.1.13",
  "port": 7000
}

# Remove node from cluster
DELETE /consensus/cluster/nodes/{node_id}

# Get cluster status
GET /consensus/cluster/status

# Get quorum information
GET /consensus/cluster/quorum

Leadership

# Get current leader
GET /consensus/leader

# Transfer leadership
POST /consensus/leader/transfer
{
  "target_node_id": "node-2"
}

# Step down as leader
POST /consensus/leader/stepdown

Operations

# Apply command (write operation)
POST /consensus/apply
{
  "type": "set",
  "payload": {
    "key": "user:123",
    "value": "John Doe"
  }
}

# Read query
POST /consensus/read
{
  "key": "user:123"
}

# Create snapshot
POST /consensus/snapshot

Best Practices

Cluster Design

  • Odd Number of Nodes: Use 3, 5, or 7 nodes for optimal fault tolerance
  • Geographic Distribution: Spread nodes across availability zones
  • Resource Planning: Ensure adequate CPU, memory, and network bandwidth
  • Network Latency: Keep inter-node latency under 10ms for best performance

Configuration

  • Election Timeouts: Set based on network latency (5-10x round-trip time)
  • Heartbeat Interval: Use 1/10th of election timeout
  • Snapshot Frequency: Balance between log size and recovery time
  • Batch Sizes: Tune based on workload characteristics

Monitoring

  • Key Metrics: Monitor leader elections, log replication lag, and error rates
  • Health Checks: Implement comprehensive health monitoring
  • Alerting: Set up alerts for leadership changes and node failures
  • Log Analysis: Monitor Raft logs for performance insights

Security

  • TLS/mTLS: Always use encrypted communication in production
  • Authentication: Implement proper node authentication
  • Network Isolation: Use firewalls and network segmentation
  • Certificate Rotation: Regularly rotate TLS certificates

Performance

  • Storage Tuning: Optimize storage backend configuration
  • Memory Management: Monitor and tune memory usage
  • Network Optimization: Use compression and multiplexing
  • Batch Operations: Group operations for better throughput

Troubleshooting

Common Issues

Split Brain Prevention

# Check quorum status
curl http://localhost:8080/consensus/cluster/quorum

# Verify cluster membership
curl http://localhost:8080/consensus/cluster/nodes

Leadership Issues

# Check current leader
curl http://localhost:8080/consensus/leader

# Monitor election frequency
curl http://localhost:8080/consensus/metrics | grep elections

Performance Problems

# Check replication lag
curl http://localhost:8080/consensus/status

# Monitor operation latency
curl http://localhost:8080/consensus/metrics | grep latency

Debugging

Enable Debug Logging

consensus:
  observability:
    logging:
      level: "debug"
      log_raft_details: true

Health Check Diagnostics

// Get detailed health information
healthStatus := service.GetHealthStatus(ctx)
for _, check := range healthStatus.Details {
    if !check.Healthy {
        log.Printf("Health check failed: %s - %s", check.Name, check.Error)
    }
}

Metrics Collection

// Collect and analyze metrics
stats := service.GetStats()
log.Printf("Error rate: %.2f%%, Avg latency: %.2fms", 
    stats.ErrorRate*100, stats.AverageLatencyMs)

if stats.ErrorRate > 0.05 { // 5% error rate threshold
    log.Println("High error rate detected")
}

Recovery Procedures

Node Recovery

# Stop the failed node
systemctl stop forge-consensus

# Clear corrupted data (if necessary)
rm -rf /data/consensus/node-1

# Restart with clean state
systemctl start forge-consensus

Cluster Recovery

# If majority of nodes are lost, restore from snapshot
# 1. Stop all nodes
# 2. Restore snapshot on one node
# 3. Start that node as single-node cluster
# 4. Add other nodes back one by one

Next Steps

How is this guide?

Last updated on