ExtensionsBrokers

Kafka Extension

Production-ready Apache Kafka client with comprehensive support for producers, consumers, consumer groups, and admin operations with enterprise-grade security and observability

Kafka Extension

The Kafka extension provides a production-ready Apache Kafka client with comprehensive support for producers, consumers, consumer groups, and admin operations. Built on the robust IBM Sarama library, it offers enterprise-grade features including TLS/SSL security, SASL authentication, compression, and observability.

Features

Core Messaging

  • Producer Support: Synchronous and asynchronous message publishing with batch operations
  • Consumer Support: Simple consumers and consumer groups with automatic partition rebalancing
  • Message Headers: Support for custom message headers and metadata
  • Exactly-Once Semantics: Idempotent producer support for reliable message delivery

Security & Authentication

  • TLS/SSL: Secure connections with mutual TLS (mTLS) support
  • SASL Authentication: Multiple mechanisms (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512)
  • Certificate Management: Client certificate authentication
  • Connection Security: Encrypted communication with configurable verification

Performance & Reliability

  • Compression: Support for gzip, snappy, lz4, and zstd compression algorithms
  • Connection Pooling: Efficient connection management and reuse
  • Retry Logic: Configurable retry mechanisms for failed operations
  • Batch Processing: Efficient batch message sending and receiving

Admin Operations

  • Topic Management: Create, delete, and describe topics
  • Metadata Queries: Partition information and broker metadata
  • Offset Management: Manual offset control and querying
  • Health Monitoring: Connection health checks and diagnostics

Observability

  • Metrics Integration: Built-in Prometheus metrics for monitoring
  • Structured Logging: Comprehensive logging with configurable levels
  • Statistics: Real-time client statistics and performance metrics
  • Error Tracking: Detailed error reporting and tracking

Installation

Prerequisites

go get github.com/IBM/sarama
go get github.com/xdg-go/scram

Basic Setup

package main

import (
    "context"
    "log"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/kafka"
)

func main() {
    app := forge.New("kafka-app")
    
    // Add Kafka extension
    app.AddExtension(kafka.NewExtension(
        kafka.WithBrokers("localhost:9092"),
        kafka.WithClientID("my-app"),
        kafka.WithProducer(true),
        kafka.WithConsumer(true),
    ))
    
    if err := app.Start(context.Background()); err != nil {
        log.Fatal(err)
    }
    
    // Get Kafka client
    var client kafka.Kafka
    app.Container().Resolve(&client)
    
    // Use the client...
    
    app.Wait()
}

Configuration

Programmatic Configuration

app.AddExtension(kafka.NewExtension(
    // Connection settings
    kafka.WithBrokers("broker1:9092", "broker2:9092"),
    kafka.WithClientID("my-service"),
    kafka.WithVersion("3.0.0"),
    
    // Security
    kafka.WithTLS("cert.pem", "key.pem", "ca.pem", false),
    kafka.WithSASL("SCRAM-SHA-512", "username", "password"),
    
    // Producer settings
    kafka.WithProducer(true),
    kafka.WithCompression("snappy"),
    kafka.WithIdempotent(true),
    kafka.WithProducerAcks("all"),
    
    // Consumer settings
    kafka.WithConsumer(true),
    kafka.WithConsumerGroup("my-group"),
    kafka.WithConsumerOffsets("newest"),
))

YAML Configuration

kafka:
  # Connection settings
  brokers:
    - localhost:9092
    - localhost:9093
  client_id: my-app
  version: "3.0.0"
  dial_timeout: 30s
  read_timeout: 30s
  write_timeout: 30s
  
  # TLS settings
  enable_tls: true
  tls_cert_file: /path/to/cert.pem
  tls_key_file: /path/to/key.pem
  tls_ca_file: /path/to/ca.pem
  tls_skip_verify: false
  
  # SASL settings
  enable_sasl: true
  sasl_mechanism: SCRAM-SHA-512
  sasl_username: user
  sasl_password: pass
  
  # Producer settings
  producer_enabled: true
  producer_compression: snappy
  producer_idempotent: true
  producer_acks: all
  producer_max_message_bytes: 1000000
  producer_flush_messages: 10
  producer_flush_frequency: 100ms
  producer_retry_max: 3
  
  # Consumer settings
  consumer_enabled: true
  consumer_group_id: my-group
  consumer_offsets: newest
  consumer_max_wait: 500ms
  consumer_fetch_min: 1
  consumer_fetch_max: 1048576
  consumer_isolation: read_uncommitted
  consumer_group_rebalance: sticky
  consumer_group_session: 10s
  consumer_group_heartbeat: 3s
  
  # Metadata settings
  metadata_retry_max: 3
  metadata_retry_backoff: 250ms
  metadata_refresh_freq: 10m
  metadata_full_refresh: true
  
  # Observability
  enable_metrics: true
  enable_tracing: true
  enable_logging: true

Environment Variables

KAFKA_BROKERS=localhost:9092,localhost:9093
KAFKA_CLIENT_ID=my-app
KAFKA_VERSION=3.0.0
KAFKA_ENABLE_TLS=true
KAFKA_TLS_CERT_FILE=/path/to/cert.pem
KAFKA_TLS_KEY_FILE=/path/to/key.pem
KAFKA_TLS_CA_FILE=/path/to/ca.pem
KAFKA_ENABLE_SASL=true
KAFKA_SASL_MECHANISM=SCRAM-SHA-512
KAFKA_SASL_USERNAME=user
KAFKA_SASL_PASSWORD=pass
KAFKA_PRODUCER_ENABLED=true
KAFKA_CONSUMER_ENABLED=true
KAFKA_CONSUMER_GROUP_ID=my-group

Producer Usage

Synchronous Publishing

// Simple message
err := client.SendMessage("my-topic", []byte("key"), []byte("value"))
if err != nil {
    log.Fatal(err)
}

// Message with headers
message := &kafka.ProducerMessage{
    Topic: "my-topic",
    Key:   []byte("key"),
    Value: []byte("value"),
    Headers: []kafka.MessageHeader{
        {Key: "source", Value: []byte("my-service")},
        {Key: "version", Value: []byte("1.0")},
    },
}
err = client.SendMessages([]*kafka.ProducerMessage{message})

Asynchronous Publishing

// Fire-and-forget
err := client.SendMessageAsync("my-topic", []byte("key"), []byte("value"))
if err != nil {
    log.Fatal(err)
}

// Batch async publishing
messages := []*kafka.ProducerMessage{
    {Topic: "topic1", Key: []byte("key1"), Value: []byte("value1")},
    {Topic: "topic2", Key: []byte("key2"), Value: []byte("value2")},
}
err = client.SendMessages(messages)

Event Sourcing Pattern

type Event struct {
    ID        string    `json:"id"`
    Type      string    `json:"type"`
    Data      any       `json:"data"`
    Timestamp time.Time `json:"timestamp"`
    Version   int       `json:"version"`
}

func publishEvent(client kafka.Kafka, event Event) error {
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }
    
    return client.SendMessage(
        "events",
        []byte(event.ID),
        data,
    )
}

// Usage
event := Event{
    ID:        uuid.New().String(),
    Type:      "user.created",
    Data:      userData,
    Timestamp: time.Now(),
    Version:   1,
}
err := publishEvent(client, event)

Consumer Usage

Simple Consumer

ctx := context.Background()
err := client.Consume(ctx, []string{"my-topic"}, func(msg *sarama.ConsumerMessage) error {
    log.Printf("Received: key=%s, value=%s, partition=%d, offset=%d",
        string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
    return nil
})

Partition-Specific Consumer

err := client.ConsumePartition(ctx, "my-topic", 0, sarama.OffsetNewest, 
    func(msg *sarama.ConsumerMessage) error {
        // Process message from specific partition
        return processMessage(msg)
    })

Consumer Groups

type EventHandler struct {
    logger forge.Logger
}

func (h *EventHandler) Setup(sarama.ConsumerGroupSession) error {
    h.logger.Info("consumer group session started")
    return nil
}

func (h *EventHandler) Cleanup(sarama.ConsumerGroupSession) error {
    h.logger.Info("consumer group session ended")
    return nil
}

func (h *EventHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for {
        select {
        case message := <-claim.Messages():
            if message == nil {
                return nil
            }
            
            // Process message
            if err := h.processMessage(message); err != nil {
                h.logger.Error("failed to process message", forge.F("error", err))
                continue
            }
            
            // Mark message as processed
            session.MarkMessage(message, "")
            
        case <-session.Context().Done():
            return nil
        }
    }
}

func (h *EventHandler) processMessage(msg *sarama.ConsumerMessage) error {
    // Your message processing logic
    return nil
}

// Usage
handler := &EventHandler{logger: app.Logger()}
err := client.JoinConsumerGroup(ctx, "my-group", []string{"my-topic"}, handler)

Message Processing Patterns

// Dead Letter Queue pattern
func processWithDLQ(client kafka.Kafka, msg *sarama.ConsumerMessage) error {
    if err := processMessage(msg); err != nil {
        // Send to dead letter queue
        dlqMsg := &kafka.ProducerMessage{
            Topic: "dlq-topic",
            Key:   msg.Key,
            Value: msg.Value,
            Headers: []kafka.MessageHeader{
                {Key: "original-topic", Value: []byte(msg.Topic)},
                {Key: "error", Value: []byte(err.Error())},
                {Key: "failed-at", Value: []byte(time.Now().Format(time.RFC3339))},
            },
        }
        return client.SendMessages([]*kafka.ProducerMessage{dlqMsg})
    }
    return nil
}

// Retry pattern with exponential backoff
func processWithRetry(msg *sarama.ConsumerMessage, maxRetries int) error {
    for attempt := 0; attempt < maxRetries; attempt++ {
        if err := processMessage(msg); err != nil {
            if attempt == maxRetries-1 {
                return err // Final attempt failed
            }
            
            // Exponential backoff
            backoff := time.Duration(math.Pow(2, float64(attempt))) * time.Second
            time.Sleep(backoff)
            continue
        }
        return nil // Success
    }
    return nil
}

Admin Operations

Topic Management

// Create topic
config := kafka.TopicConfig{
    NumPartitions:     3,
    ReplicationFactor: 2,
    ConfigEntries: map[string]*string{
        "retention.ms": stringPtr("604800000"), // 7 days
        "cleanup.policy": stringPtr("delete"),
    },
}
err := client.CreateTopic("my-topic", config)

// List topics
topics, err := client.ListTopics()
for _, topic := range topics {
    fmt.Printf("Topic: %s\n", topic)
}

// Describe topic
metadata, err := client.DescribeTopic("my-topic")
fmt.Printf("Topic: %s, Partitions: %d\n", metadata.Name, len(metadata.Partitions))

// Delete topic
err = client.DeleteTopic("my-topic")

Partition and Offset Management

// Get partitions for topic
partitions, err := client.GetPartitions("my-topic")
fmt.Printf("Partitions: %v\n", partitions)

// Get latest offset
offset, err := client.GetOffset("my-topic", 0, sarama.OffsetNewest)
fmt.Printf("Latest offset: %d\n", offset)

// Get earliest offset
offset, err = client.GetOffset("my-topic", 0, sarama.OffsetOldest)
fmt.Printf("Earliest offset: %d\n", offset)

Security Configuration

TLS/SSL Setup

// Basic TLS
app.AddExtension(kafka.NewExtension(
    kafka.WithTLS("client.crt", "client.key", "ca.crt", false),
))

// TLS with custom verification
app.AddExtension(kafka.NewExtension(
    kafka.WithTLS("", "", "ca.crt", true), // Skip client cert verification
))

SASL Authentication

// SASL/PLAIN
app.AddExtension(kafka.NewExtension(
    kafka.WithSASL("PLAIN", "username", "password"),
))

// SASL/SCRAM-SHA-256
app.AddExtension(kafka.NewExtension(
    kafka.WithSASL("SCRAM-SHA-256", "username", "password"),
))

// SASL/SCRAM-SHA-512 (recommended)
app.AddExtension(kafka.NewExtension(
    kafka.WithSASL("SCRAM-SHA-512", "username", "password"),
))

Combined TLS + SASL

app.AddExtension(kafka.NewExtension(
    kafka.WithBrokers("secure-broker:9093"),
    kafka.WithTLS("client.crt", "client.key", "ca.crt", false),
    kafka.WithSASL("SCRAM-SHA-512", "username", "password"),
))

Monitoring and Observability

Built-in Metrics

The extension automatically tracks these metrics:

  • kafka.messages.sent - Total messages sent
  • kafka.messages.received - Total messages received
  • kafka.bytes.sent - Total bytes sent
  • kafka.bytes.received - Total bytes received
  • kafka.errors - Total errors encountered
  • kafka.connections.active - Active connections

Client Statistics

stats := client.GetStats()
fmt.Printf("Messages sent: %d\n", stats.MessagesSent)
fmt.Printf("Messages received: %d\n", stats.MessagesReceived)
fmt.Printf("Bytes sent: %d\n", stats.BytesSent)
fmt.Printf("Bytes received: %d\n", stats.BytesReceived)
fmt.Printf("Errors: %d\n", stats.Errors)
fmt.Printf("Active consumers: %d\n", stats.ActiveConsumers)
fmt.Printf("Active producers: %d\n", stats.ActiveProducers)

Health Checks

// Check client health
if err := client.Ping(ctx); err != nil {
    log.Printf("Kafka client unhealthy: %v", err)
}

// Extension health check (automatic)
if err := extension.Health(ctx); err != nil {
    log.Printf("Kafka extension unhealthy: %v", err)
}

Custom Metrics Integration

// Custom metrics with labels
func recordCustomMetric(client kafka.Kafka, topic string, messageType string) {
    // Access underlying metrics from the client
    // Implement custom metric recording logic
}

Performance Optimization

Producer Optimization

app.AddExtension(kafka.NewExtension(
    // Batch settings for throughput
    kafka.WithProducerFlushMessages(100),
    kafka.WithProducerFlushFrequency(10*time.Millisecond),
    
    // Compression for network efficiency
    kafka.WithCompression("snappy"), // or "lz4", "gzip", "zstd"
    
    // Idempotence for reliability
    kafka.WithIdempotent(true),
    kafka.WithProducerAcks("all"),
    
    // Buffer sizes
    kafka.WithProducerMaxMessageBytes(1000000), // 1MB
))

Consumer Optimization

app.AddExtension(kafka.NewExtension(
    // Fetch settings for throughput
    kafka.WithConsumerFetchMin(1),
    kafka.WithConsumerFetchMax(1024*1024), // 1MB
    kafka.WithConsumerMaxWait(500*time.Millisecond),
    
    // Consumer group settings
    kafka.WithConsumerGroupRebalance("sticky"), // or "range", "roundrobin"
    kafka.WithConsumerGroupSession(10*time.Second),
    kafka.WithConsumerGroupHeartbeat(3*time.Second),
    
    // Isolation level
    kafka.WithConsumerIsolation("read_committed"), // or "read_uncommitted"
))

Connection Optimization

app.AddExtension(kafka.NewExtension(
    // Timeouts
    kafka.WithDialTimeout(30*time.Second),
    kafka.WithReadTimeout(30*time.Second),
    kafka.WithWriteTimeout(30*time.Second),
    kafka.WithKeepAlive(30*time.Second),
    
    // Metadata refresh
    kafka.WithMetadataRefreshFreq(10*time.Minute),
    kafka.WithMetadataRetryMax(3),
    kafka.WithMetadataRetryBackoff(250*time.Millisecond),
))

Error Handling

Common Error Types

import "github.com/xraph/forge/extensions/kafka"

// Check for specific errors
if errors.Is(err, kafka.ErrProducerNotEnabled) {
    log.Println("Producer is not enabled")
}

if errors.Is(err, kafka.ErrTopicNotFound) {
    log.Println("Topic does not exist")
}

if errors.Is(err, kafka.ErrConnectionFailed) {
    log.Println("Failed to connect to Kafka")
}

Retry Logic

func sendWithRetry(client kafka.Kafka, topic string, key, value []byte, maxRetries int) error {
    for attempt := 0; attempt < maxRetries; attempt++ {
        if err := client.SendMessage(topic, key, value); err != nil {
            if attempt == maxRetries-1 {
                return fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
            }
            
            // Exponential backoff
            backoff := time.Duration(math.Pow(2, float64(attempt))) * time.Second
            time.Sleep(backoff)
            continue
        }
        return nil
    }
    return nil
}

Circuit Breaker Pattern

type CircuitBreaker struct {
    failures    int
    maxFailures int
    timeout     time.Duration
    lastFailure time.Time
    state       string // "closed", "open", "half-open"
}

func (cb *CircuitBreaker) Call(fn func() error) error {
    if cb.state == "open" {
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = "half-open"
        } else {
            return errors.New("circuit breaker is open")
        }
    }
    
    err := fn()
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        if cb.failures >= cb.maxFailures {
            cb.state = "open"
        }
        return err
    }
    
    cb.failures = 0
    cb.state = "closed"
    return nil
}

Best Practices

Message Design

  1. Use meaningful keys for proper partitioning
  2. Keep messages small (< 1MB recommended)
  3. Include metadata in headers, not payload
  4. Use schema evolution for backward compatibility
  5. Implement idempotent processing for consumers

Topic Design

  1. Plan partition count based on throughput requirements
  2. Set appropriate retention policies
  3. Use consistent naming conventions
  4. Consider replication factor for availability
  5. Monitor topic metrics regularly

Security

  1. Always use TLS in production
  2. Implement proper SASL authentication
  3. Rotate credentials regularly
  4. Use least privilege access controls
  5. Monitor security events

Performance

  1. Batch messages when possible
  2. Use compression for large payloads
  3. Tune consumer fetch sizes
  4. Monitor lag and throughput
  5. Scale consumers horizontally

Reliability

  1. Enable idempotent producers
  2. Use consumer groups for scalability
  3. Implement retry logic with backoff
  4. Handle poison messages appropriately
  5. Monitor health continuously

Troubleshooting

Common Issues

Connection timeouts:

kafka:
  dial_timeout: 60s
  read_timeout: 60s
  write_timeout: 60s

Consumer lag:

kafka:
  consumer_fetch_max: 2097152  # 2MB
  consumer_max_wait: 100ms

Producer throughput:

kafka:
  producer_flush_messages: 1000
  producer_flush_frequency: 10ms
  producer_compression: snappy

Authentication failures:

kafka:
  enable_sasl: true
  sasl_mechanism: SCRAM-SHA-512
  sasl_username: correct_username
  sasl_password: correct_password

Debug Logging

app.AddExtension(kafka.NewExtension(
    kafka.WithLogging(true),
    // Other options...
))

Health Monitoring

// Periodic health check
go func() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        if err := client.Ping(ctx); err != nil {
            log.Printf("Kafka health check failed: %v", err)
            // Implement alerting logic
        }
    }
}()

API Reference

Core Interface

type Kafka interface {
    // Producer operations
    SendMessage(topic string, key, value []byte) error
    SendMessageAsync(topic string, key, value []byte) error
    SendMessages(messages []*ProducerMessage) error

    // Consumer operations
    Consume(ctx context.Context, topics []string, handler MessageHandler) error
    ConsumePartition(ctx context.Context, topic string, partition int32, offset int64, handler MessageHandler) error
    StopConsume() error

    // Consumer group operations
    JoinConsumerGroup(ctx context.Context, groupID string, topics []string, handler ConsumerGroupHandler) error
    LeaveConsumerGroup(ctx context.Context) error

    // Admin operations
    CreateTopic(topic string, config TopicConfig) error
    DeleteTopic(topic string) error
    ListTopics() ([]string, error)
    DescribeTopic(topic string) (*TopicMetadata, error)
    GetPartitions(topic string) ([]int32, error)
    GetOffset(topic string, partition int32, time int64) (int64, error)

    // Client access
    GetProducer() sarama.SyncProducer
    GetAsyncProducer() sarama.AsyncProducer
    GetConsumer() sarama.Consumer
    GetConsumerGroup() sarama.ConsumerGroup
    GetClient() sarama.Client
    GetStats() ClientStats

    // Lifecycle
    Close() error
    Ping(ctx context.Context) error
}

Configuration Options

// Connection options
func WithBrokers(brokers ...string) ConfigOption
func WithClientID(clientID string) ConfigOption
func WithVersion(version string) ConfigOption

// Security options
func WithTLS(certFile, keyFile, caFile string, skipVerify bool) ConfigOption
func WithSASL(mechanism, username, password string) ConfigOption

// Producer options
func WithProducer(enabled bool) ConfigOption
func WithCompression(compression string) ConfigOption
func WithIdempotent(enabled bool) ConfigOption
func WithProducerAcks(acks string) ConfigOption

// Consumer options
func WithConsumer(enabled bool) ConfigOption
func WithConsumerGroup(groupID string) ConfigOption
func WithConsumerOffsets(offsets string) ConfigOption

// Observability options
func WithMetrics(enabled bool) ConfigOption
func WithLogging(enabled bool) ConfigOption
func WithTracing(enabled bool) ConfigOption

The Kafka extension provides a robust, production-ready solution for Apache Kafka integration with comprehensive features for messaging, event sourcing, and distributed communication patterns.

How is this guide?

Last updated on