Consensus Extension
Distributed consensus capabilities using the Raft algorithm, enabling high availability, leader election, and data replication across multiple nodes in a cluster
Consensus Extension
The Consensus extension provides distributed consensus capabilities using the Raft algorithm, enabling high availability, leader election, and data replication across multiple nodes in a cluster.
Features
Core Consensus
- Raft Algorithm: Industry-standard consensus protocol for distributed systems
- Leader Election: Automatic leader selection with configurable timeouts
- Log Replication: Consistent data replication across cluster nodes
- Snapshotting: Automatic log compaction and state snapshots
- Dynamic Membership: Add/remove nodes without downtime
Transport & Discovery
- Multiple Transports: gRPC and TCP with TLS/mTLS support
- Service Discovery: Static, DNS, Consul, Kubernetes, etcd integration
- Connection Management: Keep-alive, multiplexing, compression
- Network Resilience: Automatic reconnection and failure detection
Storage Backends
- BadgerDB: High-performance embedded key-value store
- BoltDB: ACID-compliant embedded database
- Pebble: RocksDB-inspired storage engine
- PostgreSQL: External database backend support
Observability
- Prometheus Metrics: Comprehensive cluster and node metrics
- Health Checks: Configurable health monitoring
- Structured Logging: Detailed operational logs
- Distributed Tracing: Optional tracing support
Security
- TLS/mTLS: Encrypted communication between nodes
- Authentication: Configurable node authentication
- Data Encryption: Optional at-rest encryption
Installation
Go Module
go get github.com/xraph/forge/extensions/consensusDocker
docker pull xraph/forge-consensus:latestPackage Manager
# Using the Forge CLI
forge extension add consensusConfiguration
YAML Configuration
consensus:
node_id: "node-1"
cluster_id: "my-cluster"
bind_addr: "0.0.0.0"
bind_port: 7000
# Initial cluster peers
peers:
- id: "node-1"
address: "localhost"
port: 7000
- id: "node-2"
address: "localhost"
port: 7001
- id: "node-3"
address: "localhost"
port: 7002
# Raft configuration
raft:
heartbeat_interval: "1s"
election_timeout_min: "5s"
election_timeout_max: "10s"
snapshot_interval: "30m"
snapshot_threshold: 10000
log_cache_size: 1024
max_append_entries: 64
trailing_logs: 10000
pre_vote: true
check_quorum: true
# Transport configuration
transport:
type: "grpc" # grpc, tcp
max_message_size: 4194304 # 4MB
timeout: "10s"
keep_alive: true
keep_alive_interval: "30s"
enable_compression: true
enable_multiplexing: true
# Discovery configuration
discovery:
type: "static" # static, dns, consul, etcd, kubernetes
refresh_interval: "30s"
enable_watch: true
# Storage configuration
storage:
type: "badger" # badger, boltdb, pebble, postgres
path: "./data/consensus"
sync_writes: true
max_batch_size: 1000
max_batch_delay: "10ms"
# BadgerDB options
badger_options:
value_log_max_entries: 1000000
mem_table_size: 67108864 # 64MB
num_mem_tables: 5
num_compactors: 4
# Security configuration
security:
enable_tls: false
enable_mtls: false
cert_file: ""
key_file: ""
ca_file: ""
enable_encryption: false
# Health checks
health:
enabled: true
check_interval: "10s"
timeout: "5s"
unhealthy_threshold: 3
healthy_threshold: 2
# Observability
observability:
metrics:
enabled: true
collection_interval: "15s"
namespace: "forge_consensus"
enable_detailed_metrics: true
tracing:
enabled: false
service_name: "forge-consensus"
sample_rate: 0.1
logging:
level: "info"
enable_structured: true
log_raft_details: false
# Admin API
admin_api:
enabled: true
path_prefix: "/consensus"
enable_auth: false
# Advanced settings
advanced:
enable_auto_snapshot: true
enable_auto_compaction: true
compaction_interval: "1h"
max_memory_usage: 1073741824 # 1GB
gc_interval: "5m"
enable_read_index: true
enable_leased_reads: trueEnvironment Variables
CONSENSUS_NODE_ID=node-1
CONSENSUS_CLUSTER_ID=my-cluster
CONSENSUS_BIND_ADDR=0.0.0.0
CONSENSUS_BIND_PORT=7000
CONSENSUS_STORAGE_PATH=./data/consensus
CONSENSUS_TRANSPORT_TYPE=grpc
CONSENSUS_DISCOVERY_TYPE=static
CONSENSUS_ENABLE_TLS=false
CONSENSUS_ENABLE_METRICS=trueProgrammatic Configuration
package main
import (
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/consensus"
)
func main() {
app := forge.NewApp(forge.AppConfig{
Name: "consensus-app",
})
// Register consensus extension
err := app.RegisterExtension(consensus.NewExtension(
consensus.WithNodeID("node-1"),
consensus.WithClusterID("my-cluster"),
consensus.WithBindAddress("0.0.0.0", 7000),
consensus.WithPeers([]consensus.PeerConfig{
{ID: "node-1", Address: "localhost", Port: 7000},
{ID: "node-2", Address: "localhost", Port: 7001},
{ID: "node-3", Address: "localhost", Port: 7002},
}),
consensus.WithStoragePath("./data/node-1"),
consensus.WithTransportType("grpc"),
consensus.WithTLS("cert.pem", "key.pem"),
))
if err != nil {
panic(err)
}
app.Start(context.Background())
}Usage Examples
Basic Consensus Service
package main
import (
"context"
"log"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/consensus"
)
func main() {
app := forge.NewApp(forge.AppConfig{Name: "consensus-example"})
// Register consensus extension
app.RegisterExtension(consensus.NewExtension(
consensus.WithNodeID("node-1"),
consensus.WithClusterID("my-cluster"),
consensus.WithBindAddress("0.0.0.0", 7000),
consensus.WithStoragePath("./data/node-1"),
))
ctx := context.Background()
app.Start(ctx)
// Get consensus service
service, _ := forge.Resolve[*consensus.Service](app.Container(), "consensus")
// Check if this node is the leader
if service.IsLeader() {
log.Println("This node is the leader")
// Apply a command
cmd := consensus.Command{
Type: "set",
Payload: map[string]interface{}{
"key": "user:123",
"value": "John Doe",
},
}
err := service.Apply(ctx, cmd)
if err != nil {
log.Printf("Failed to apply command: %v", err)
}
}
// Read data (can be done on any node)
result, err := service.Read(ctx, map[string]interface{}{
"key": "user:123",
})
if err != nil {
log.Printf("Failed to read: %v", err)
} else {
log.Printf("Read result: %v", result)
}
}Cluster Management
// Add a new node to the cluster
err := service.AddNode(ctx, "node-4", "192.168.1.4", 7000)
if err != nil {
log.Printf("Failed to add node: %v", err)
}
// Remove a node from the cluster
err = service.RemoveNode(ctx, "node-4")
if err != nil {
log.Printf("Failed to remove node: %v", err)
}
// Transfer leadership to another node
err = service.TransferLeadership(ctx, "node-2")
if err != nil {
log.Printf("Failed to transfer leadership: %v", err)
}
// Get cluster information
clusterInfo := service.GetClusterInfo()
log.Printf("Cluster has %d nodes, leader: %s",
clusterInfo.TotalNodes, clusterInfo.Leader)Health Monitoring
// Check service health
err := service.HealthCheck(ctx)
if err != nil {
log.Printf("Health check failed: %v", err)
}
// Get detailed health status
healthStatus := service.GetHealthStatus(ctx)
log.Printf("Health: %t, Status: %s, Has Quorum: %t",
healthStatus.Healthy, healthStatus.Status, healthStatus.HasQuorum)
// Get consensus statistics
stats := service.GetStats()
log.Printf("Node: %s, Role: %s, Term: %d, Operations: %d",
stats.NodeID, stats.Role, stats.Term, stats.OperationsTotal)Custom State Machine
type CustomStateMachine struct {
data map[string]interface{}
mu sync.RWMutex
}
func (sm *CustomStateMachine) Apply(entry consensus.LogEntry) error {
sm.mu.Lock()
defer sm.mu.Unlock()
var cmd consensus.Command
if err := json.Unmarshal(entry.Data, &cmd); err != nil {
return err
}
switch cmd.Type {
case "set":
key := cmd.Payload["key"].(string)
value := cmd.Payload["value"]
sm.data[key] = value
case "delete":
key := cmd.Payload["key"].(string)
delete(sm.data, key)
}
return nil
}
func (sm *CustomStateMachine) Query(query interface{}) (interface{}, error) {
sm.mu.RLock()
defer sm.mu.RUnlock()
q := query.(map[string]interface{})
key := q["key"].(string)
return sm.data[key], nil
}
func (sm *CustomStateMachine) Snapshot() (*consensus.Snapshot, error) {
sm.mu.RLock()
defer sm.mu.RUnlock()
data, err := json.Marshal(sm.data)
if err != nil {
return nil, err
}
return &consensus.Snapshot{
Data: data,
Created: time.Now(),
}, nil
}
func (sm *CustomStateMachine) Restore(snapshot *consensus.Snapshot) error {
sm.mu.Lock()
defer sm.mu.Unlock()
return json.Unmarshal(snapshot.Data, &sm.data)
}Advanced Features
Distributed Deployment
# Node 1 configuration
consensus:
node_id: "node-1"
cluster_id: "prod-cluster"
bind_addr: "10.0.1.10"
bind_port: 7000
peers:
- id: "node-1"
address: "10.0.1.10"
port: 7000
- id: "node-2"
address: "10.0.1.11"
port: 7000
- id: "node-3"
address: "10.0.1.12"
port: 7000
discovery:
type: "consul"
endpoints: ["consul.service.consul:8500"]
service_name: "forge-consensus"
namespace: "production"
security:
enable_mtls: true
cert_file: "/etc/certs/node.crt"
key_file: "/etc/certs/node.key"
ca_file: "/etc/certs/ca.crt"High Availability Setup
// Configure for high availability
config := consensus.Config{
Raft: consensus.RaftConfig{
HeartbeatInterval: time.Second,
ElectionTimeoutMin: 5 * time.Second,
ElectionTimeoutMax: 10 * time.Second,
PreVote: true,
CheckQuorum: true,
},
Health: consensus.HealthConfig{
Enabled: true,
CheckInterval: 10 * time.Second,
UnhealthyThreshold: 3,
HealthyThreshold: 2,
},
Resilience: consensus.ResilienceConfig{
EnableRetry: true,
MaxRetries: 3,
RetryDelay: 100 * time.Millisecond,
EnableCircuitBreaker: true,
CircuitBreakerThreshold: 5,
CircuitBreakerTimeout: 30 * time.Second,
},
}Performance Optimization
consensus:
raft:
log_cache_size: 2048
max_append_entries: 128
replication_batch_size: 200
disable_pipeline: false
transport:
max_message_size: 8388608 # 8MB
enable_compression: true
compression_level: 6
enable_multiplexing: true
max_connections: 200
storage:
max_batch_size: 2000
max_batch_delay: "5ms"
badger_options:
mem_table_size: 134217728 # 128MB
num_mem_tables: 8
num_compactors: 8
advanced:
max_memory_usage: 2147483648 # 2GB
gc_interval: "2m"
enable_read_index: true
enable_leased_reads: trueAPI Endpoints
Health & Status
# Basic health check
GET /consensus/health
# Detailed health status
GET /consensus/health/detailed
# Node status and statistics
GET /consensus/status
# Prometheus metrics
GET /consensus/metricsCluster Management
# List all nodes
GET /consensus/cluster/nodes
# Get specific node
GET /consensus/cluster/nodes/{node_id}
# Add node to cluster
POST /consensus/cluster/nodes
{
"node_id": "node-4",
"address": "10.0.1.13",
"port": 7000
}
# Remove node from cluster
DELETE /consensus/cluster/nodes/{node_id}
# Get cluster status
GET /consensus/cluster/status
# Get quorum information
GET /consensus/cluster/quorumLeadership
# Get current leader
GET /consensus/leader
# Transfer leadership
POST /consensus/leader/transfer
{
"target_node_id": "node-2"
}
# Step down as leader
POST /consensus/leader/stepdownOperations
# Apply command (write operation)
POST /consensus/apply
{
"type": "set",
"payload": {
"key": "user:123",
"value": "John Doe"
}
}
# Read query
POST /consensus/read
{
"key": "user:123"
}
# Create snapshot
POST /consensus/snapshotBest Practices
Cluster Design
- Odd Number of Nodes: Use 3, 5, or 7 nodes for optimal fault tolerance
- Geographic Distribution: Spread nodes across availability zones
- Resource Planning: Ensure adequate CPU, memory, and network bandwidth
- Network Latency: Keep inter-node latency under 10ms for best performance
Configuration
- Election Timeouts: Set based on network latency (5-10x round-trip time)
- Heartbeat Interval: Use 1/10th of election timeout
- Snapshot Frequency: Balance between log size and recovery time
- Batch Sizes: Tune based on workload characteristics
Monitoring
- Key Metrics: Monitor leader elections, log replication lag, and error rates
- Health Checks: Implement comprehensive health monitoring
- Alerting: Set up alerts for leadership changes and node failures
- Log Analysis: Monitor Raft logs for performance insights
Security
- TLS/mTLS: Always use encrypted communication in production
- Authentication: Implement proper node authentication
- Network Isolation: Use firewalls and network segmentation
- Certificate Rotation: Regularly rotate TLS certificates
Performance
- Storage Tuning: Optimize storage backend configuration
- Memory Management: Monitor and tune memory usage
- Network Optimization: Use compression and multiplexing
- Batch Operations: Group operations for better throughput
Troubleshooting
Common Issues
Split Brain Prevention
# Check quorum status
curl http://localhost:8080/consensus/cluster/quorum
# Verify cluster membership
curl http://localhost:8080/consensus/cluster/nodesLeadership Issues
# Check current leader
curl http://localhost:8080/consensus/leader
# Monitor election frequency
curl http://localhost:8080/consensus/metrics | grep electionsPerformance Problems
# Check replication lag
curl http://localhost:8080/consensus/status
# Monitor operation latency
curl http://localhost:8080/consensus/metrics | grep latencyDebugging
Enable Debug Logging
consensus:
observability:
logging:
level: "debug"
log_raft_details: trueHealth Check Diagnostics
// Get detailed health information
healthStatus := service.GetHealthStatus(ctx)
for _, check := range healthStatus.Details {
if !check.Healthy {
log.Printf("Health check failed: %s - %s", check.Name, check.Error)
}
}Metrics Collection
// Collect and analyze metrics
stats := service.GetStats()
log.Printf("Error rate: %.2f%%, Avg latency: %.2fms",
stats.ErrorRate*100, stats.AverageLatencyMs)
if stats.ErrorRate > 0.05 { // 5% error rate threshold
log.Println("High error rate detected")
}Recovery Procedures
Node Recovery
# Stop the failed node
systemctl stop forge-consensus
# Clear corrupted data (if necessary)
rm -rf /data/consensus/node-1
# Restart with clean state
systemctl start forge-consensusCluster Recovery
# If majority of nodes are lost, restore from snapshot
# 1. Stop all nodes
# 2. Restore snapshot on one node
# 3. Start that node as single-node cluster
# 4. Add other nodes back one by oneNext Steps
- Explore the Events Extension for distributed event handling
- Learn about gRPC Extension for service communication
- Check out Dashboard Extension for monitoring
- Review Security Best Practices for production deployment
How is this guide?
Last updated on