Kafka Extension
Production-ready Apache Kafka client with comprehensive support for producers, consumers, consumer groups, and admin operations with enterprise-grade security and observability
Kafka Extension
The Kafka extension provides a production-ready Apache Kafka client with comprehensive support for producers, consumers, consumer groups, and admin operations. Built on the robust IBM Sarama library, it offers enterprise-grade features including TLS/SSL security, SASL authentication, compression, and observability.
Features
Core Messaging
- Producer Support: Synchronous and asynchronous message publishing with batch operations
- Consumer Support: Simple consumers and consumer groups with automatic partition rebalancing
- Message Headers: Support for custom message headers and metadata
- Exactly-Once Semantics: Idempotent producer support for reliable message delivery
Security & Authentication
- TLS/SSL: Secure connections with mutual TLS (mTLS) support
- SASL Authentication: Multiple mechanisms (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512)
- Certificate Management: Client certificate authentication
- Connection Security: Encrypted communication with configurable verification
Performance & Reliability
- Compression: Support for gzip, snappy, lz4, and zstd compression algorithms
- Connection Pooling: Efficient connection management and reuse
- Retry Logic: Configurable retry mechanisms for failed operations
- Batch Processing: Efficient batch message sending and receiving
Admin Operations
- Topic Management: Create, delete, and describe topics
- Metadata Queries: Partition information and broker metadata
- Offset Management: Manual offset control and querying
- Health Monitoring: Connection health checks and diagnostics
Observability
- Metrics Integration: Built-in Prometheus metrics for monitoring
- Structured Logging: Comprehensive logging with configurable levels
- Statistics: Real-time client statistics and performance metrics
- Error Tracking: Detailed error reporting and tracking
Installation
Prerequisites
go get github.com/IBM/sarama
go get github.com/xdg-go/scramBasic Setup
package main
import (
"context"
"log"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/kafka"
)
func main() {
app := forge.New("kafka-app")
// Add Kafka extension
app.AddExtension(kafka.NewExtension(
kafka.WithBrokers("localhost:9092"),
kafka.WithClientID("my-app"),
kafka.WithProducer(true),
kafka.WithConsumer(true),
))
if err := app.Start(context.Background()); err != nil {
log.Fatal(err)
}
// Get Kafka client
var client kafka.Kafka
app.Container().Resolve(&client)
// Use the client...
app.Wait()
}Configuration
Programmatic Configuration
app.AddExtension(kafka.NewExtension(
// Connection settings
kafka.WithBrokers("broker1:9092", "broker2:9092"),
kafka.WithClientID("my-service"),
kafka.WithVersion("3.0.0"),
// Security
kafka.WithTLS("cert.pem", "key.pem", "ca.pem", false),
kafka.WithSASL("SCRAM-SHA-512", "username", "password"),
// Producer settings
kafka.WithProducer(true),
kafka.WithCompression("snappy"),
kafka.WithIdempotent(true),
kafka.WithProducerAcks("all"),
// Consumer settings
kafka.WithConsumer(true),
kafka.WithConsumerGroup("my-group"),
kafka.WithConsumerOffsets("newest"),
))YAML Configuration
kafka:
# Connection settings
brokers:
- localhost:9092
- localhost:9093
client_id: my-app
version: "3.0.0"
dial_timeout: 30s
read_timeout: 30s
write_timeout: 30s
# TLS settings
enable_tls: true
tls_cert_file: /path/to/cert.pem
tls_key_file: /path/to/key.pem
tls_ca_file: /path/to/ca.pem
tls_skip_verify: false
# SASL settings
enable_sasl: true
sasl_mechanism: SCRAM-SHA-512
sasl_username: user
sasl_password: pass
# Producer settings
producer_enabled: true
producer_compression: snappy
producer_idempotent: true
producer_acks: all
producer_max_message_bytes: 1000000
producer_flush_messages: 10
producer_flush_frequency: 100ms
producer_retry_max: 3
# Consumer settings
consumer_enabled: true
consumer_group_id: my-group
consumer_offsets: newest
consumer_max_wait: 500ms
consumer_fetch_min: 1
consumer_fetch_max: 1048576
consumer_isolation: read_uncommitted
consumer_group_rebalance: sticky
consumer_group_session: 10s
consumer_group_heartbeat: 3s
# Metadata settings
metadata_retry_max: 3
metadata_retry_backoff: 250ms
metadata_refresh_freq: 10m
metadata_full_refresh: true
# Observability
enable_metrics: true
enable_tracing: true
enable_logging: trueEnvironment Variables
KAFKA_BROKERS=localhost:9092,localhost:9093
KAFKA_CLIENT_ID=my-app
KAFKA_VERSION=3.0.0
KAFKA_ENABLE_TLS=true
KAFKA_TLS_CERT_FILE=/path/to/cert.pem
KAFKA_TLS_KEY_FILE=/path/to/key.pem
KAFKA_TLS_CA_FILE=/path/to/ca.pem
KAFKA_ENABLE_SASL=true
KAFKA_SASL_MECHANISM=SCRAM-SHA-512
KAFKA_SASL_USERNAME=user
KAFKA_SASL_PASSWORD=pass
KAFKA_PRODUCER_ENABLED=true
KAFKA_CONSUMER_ENABLED=true
KAFKA_CONSUMER_GROUP_ID=my-groupProducer Usage
Synchronous Publishing
// Simple message
err := client.SendMessage("my-topic", []byte("key"), []byte("value"))
if err != nil {
log.Fatal(err)
}
// Message with headers
message := &kafka.ProducerMessage{
Topic: "my-topic",
Key: []byte("key"),
Value: []byte("value"),
Headers: []kafka.MessageHeader{
{Key: "source", Value: []byte("my-service")},
{Key: "version", Value: []byte("1.0")},
},
}
err = client.SendMessages([]*kafka.ProducerMessage{message})Asynchronous Publishing
// Fire-and-forget
err := client.SendMessageAsync("my-topic", []byte("key"), []byte("value"))
if err != nil {
log.Fatal(err)
}
// Batch async publishing
messages := []*kafka.ProducerMessage{
{Topic: "topic1", Key: []byte("key1"), Value: []byte("value1")},
{Topic: "topic2", Key: []byte("key2"), Value: []byte("value2")},
}
err = client.SendMessages(messages)Event Sourcing Pattern
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Data any `json:"data"`
Timestamp time.Time `json:"timestamp"`
Version int `json:"version"`
}
func publishEvent(client kafka.Kafka, event Event) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
return client.SendMessage(
"events",
[]byte(event.ID),
data,
)
}
// Usage
event := Event{
ID: uuid.New().String(),
Type: "user.created",
Data: userData,
Timestamp: time.Now(),
Version: 1,
}
err := publishEvent(client, event)Consumer Usage
Simple Consumer
ctx := context.Background()
err := client.Consume(ctx, []string{"my-topic"}, func(msg *sarama.ConsumerMessage) error {
log.Printf("Received: key=%s, value=%s, partition=%d, offset=%d",
string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
return nil
})Partition-Specific Consumer
err := client.ConsumePartition(ctx, "my-topic", 0, sarama.OffsetNewest,
func(msg *sarama.ConsumerMessage) error {
// Process message from specific partition
return processMessage(msg)
})Consumer Groups
type EventHandler struct {
logger forge.Logger
}
func (h *EventHandler) Setup(sarama.ConsumerGroupSession) error {
h.logger.Info("consumer group session started")
return nil
}
func (h *EventHandler) Cleanup(sarama.ConsumerGroupSession) error {
h.logger.Info("consumer group session ended")
return nil
}
func (h *EventHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message := <-claim.Messages():
if message == nil {
return nil
}
// Process message
if err := h.processMessage(message); err != nil {
h.logger.Error("failed to process message", forge.F("error", err))
continue
}
// Mark message as processed
session.MarkMessage(message, "")
case <-session.Context().Done():
return nil
}
}
}
func (h *EventHandler) processMessage(msg *sarama.ConsumerMessage) error {
// Your message processing logic
return nil
}
// Usage
handler := &EventHandler{logger: app.Logger()}
err := client.JoinConsumerGroup(ctx, "my-group", []string{"my-topic"}, handler)Message Processing Patterns
// Dead Letter Queue pattern
func processWithDLQ(client kafka.Kafka, msg *sarama.ConsumerMessage) error {
if err := processMessage(msg); err != nil {
// Send to dead letter queue
dlqMsg := &kafka.ProducerMessage{
Topic: "dlq-topic",
Key: msg.Key,
Value: msg.Value,
Headers: []kafka.MessageHeader{
{Key: "original-topic", Value: []byte(msg.Topic)},
{Key: "error", Value: []byte(err.Error())},
{Key: "failed-at", Value: []byte(time.Now().Format(time.RFC3339))},
},
}
return client.SendMessages([]*kafka.ProducerMessage{dlqMsg})
}
return nil
}
// Retry pattern with exponential backoff
func processWithRetry(msg *sarama.ConsumerMessage, maxRetries int) error {
for attempt := 0; attempt < maxRetries; attempt++ {
if err := processMessage(msg); err != nil {
if attempt == maxRetries-1 {
return err // Final attempt failed
}
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(attempt))) * time.Second
time.Sleep(backoff)
continue
}
return nil // Success
}
return nil
}Admin Operations
Topic Management
// Create topic
config := kafka.TopicConfig{
NumPartitions: 3,
ReplicationFactor: 2,
ConfigEntries: map[string]*string{
"retention.ms": stringPtr("604800000"), // 7 days
"cleanup.policy": stringPtr("delete"),
},
}
err := client.CreateTopic("my-topic", config)
// List topics
topics, err := client.ListTopics()
for _, topic := range topics {
fmt.Printf("Topic: %s\n", topic)
}
// Describe topic
metadata, err := client.DescribeTopic("my-topic")
fmt.Printf("Topic: %s, Partitions: %d\n", metadata.Name, len(metadata.Partitions))
// Delete topic
err = client.DeleteTopic("my-topic")Partition and Offset Management
// Get partitions for topic
partitions, err := client.GetPartitions("my-topic")
fmt.Printf("Partitions: %v\n", partitions)
// Get latest offset
offset, err := client.GetOffset("my-topic", 0, sarama.OffsetNewest)
fmt.Printf("Latest offset: %d\n", offset)
// Get earliest offset
offset, err = client.GetOffset("my-topic", 0, sarama.OffsetOldest)
fmt.Printf("Earliest offset: %d\n", offset)Security Configuration
TLS/SSL Setup
// Basic TLS
app.AddExtension(kafka.NewExtension(
kafka.WithTLS("client.crt", "client.key", "ca.crt", false),
))
// TLS with custom verification
app.AddExtension(kafka.NewExtension(
kafka.WithTLS("", "", "ca.crt", true), // Skip client cert verification
))SASL Authentication
// SASL/PLAIN
app.AddExtension(kafka.NewExtension(
kafka.WithSASL("PLAIN", "username", "password"),
))
// SASL/SCRAM-SHA-256
app.AddExtension(kafka.NewExtension(
kafka.WithSASL("SCRAM-SHA-256", "username", "password"),
))
// SASL/SCRAM-SHA-512 (recommended)
app.AddExtension(kafka.NewExtension(
kafka.WithSASL("SCRAM-SHA-512", "username", "password"),
))Combined TLS + SASL
app.AddExtension(kafka.NewExtension(
kafka.WithBrokers("secure-broker:9093"),
kafka.WithTLS("client.crt", "client.key", "ca.crt", false),
kafka.WithSASL("SCRAM-SHA-512", "username", "password"),
))Monitoring and Observability
Built-in Metrics
The extension automatically tracks these metrics:
kafka.messages.sent- Total messages sentkafka.messages.received- Total messages receivedkafka.bytes.sent- Total bytes sentkafka.bytes.received- Total bytes receivedkafka.errors- Total errors encounteredkafka.connections.active- Active connections
Client Statistics
stats := client.GetStats()
fmt.Printf("Messages sent: %d\n", stats.MessagesSent)
fmt.Printf("Messages received: %d\n", stats.MessagesReceived)
fmt.Printf("Bytes sent: %d\n", stats.BytesSent)
fmt.Printf("Bytes received: %d\n", stats.BytesReceived)
fmt.Printf("Errors: %d\n", stats.Errors)
fmt.Printf("Active consumers: %d\n", stats.ActiveConsumers)
fmt.Printf("Active producers: %d\n", stats.ActiveProducers)Health Checks
// Check client health
if err := client.Ping(ctx); err != nil {
log.Printf("Kafka client unhealthy: %v", err)
}
// Extension health check (automatic)
if err := extension.Health(ctx); err != nil {
log.Printf("Kafka extension unhealthy: %v", err)
}Custom Metrics Integration
// Custom metrics with labels
func recordCustomMetric(client kafka.Kafka, topic string, messageType string) {
// Access underlying metrics from the client
// Implement custom metric recording logic
}Performance Optimization
Producer Optimization
app.AddExtension(kafka.NewExtension(
// Batch settings for throughput
kafka.WithProducerFlushMessages(100),
kafka.WithProducerFlushFrequency(10*time.Millisecond),
// Compression for network efficiency
kafka.WithCompression("snappy"), // or "lz4", "gzip", "zstd"
// Idempotence for reliability
kafka.WithIdempotent(true),
kafka.WithProducerAcks("all"),
// Buffer sizes
kafka.WithProducerMaxMessageBytes(1000000), // 1MB
))Consumer Optimization
app.AddExtension(kafka.NewExtension(
// Fetch settings for throughput
kafka.WithConsumerFetchMin(1),
kafka.WithConsumerFetchMax(1024*1024), // 1MB
kafka.WithConsumerMaxWait(500*time.Millisecond),
// Consumer group settings
kafka.WithConsumerGroupRebalance("sticky"), // or "range", "roundrobin"
kafka.WithConsumerGroupSession(10*time.Second),
kafka.WithConsumerGroupHeartbeat(3*time.Second),
// Isolation level
kafka.WithConsumerIsolation("read_committed"), // or "read_uncommitted"
))Connection Optimization
app.AddExtension(kafka.NewExtension(
// Timeouts
kafka.WithDialTimeout(30*time.Second),
kafka.WithReadTimeout(30*time.Second),
kafka.WithWriteTimeout(30*time.Second),
kafka.WithKeepAlive(30*time.Second),
// Metadata refresh
kafka.WithMetadataRefreshFreq(10*time.Minute),
kafka.WithMetadataRetryMax(3),
kafka.WithMetadataRetryBackoff(250*time.Millisecond),
))Error Handling
Common Error Types
import "github.com/xraph/forge/extensions/kafka"
// Check for specific errors
if errors.Is(err, kafka.ErrProducerNotEnabled) {
log.Println("Producer is not enabled")
}
if errors.Is(err, kafka.ErrTopicNotFound) {
log.Println("Topic does not exist")
}
if errors.Is(err, kafka.ErrConnectionFailed) {
log.Println("Failed to connect to Kafka")
}Retry Logic
func sendWithRetry(client kafka.Kafka, topic string, key, value []byte, maxRetries int) error {
for attempt := 0; attempt < maxRetries; attempt++ {
if err := client.SendMessage(topic, key, value); err != nil {
if attempt == maxRetries-1 {
return fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
}
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(attempt))) * time.Second
time.Sleep(backoff)
continue
}
return nil
}
return nil
}Circuit Breaker Pattern
type CircuitBreaker struct {
failures int
maxFailures int
timeout time.Duration
lastFailure time.Time
state string // "closed", "open", "half-open"
}
func (cb *CircuitBreaker) Call(fn func() error) error {
if cb.state == "open" {
if time.Since(cb.lastFailure) > cb.timeout {
cb.state = "half-open"
} else {
return errors.New("circuit breaker is open")
}
}
err := fn()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.maxFailures {
cb.state = "open"
}
return err
}
cb.failures = 0
cb.state = "closed"
return nil
}Best Practices
Message Design
- Use meaningful keys for proper partitioning
- Keep messages small (< 1MB recommended)
- Include metadata in headers, not payload
- Use schema evolution for backward compatibility
- Implement idempotent processing for consumers
Topic Design
- Plan partition count based on throughput requirements
- Set appropriate retention policies
- Use consistent naming conventions
- Consider replication factor for availability
- Monitor topic metrics regularly
Security
- Always use TLS in production
- Implement proper SASL authentication
- Rotate credentials regularly
- Use least privilege access controls
- Monitor security events
Performance
- Batch messages when possible
- Use compression for large payloads
- Tune consumer fetch sizes
- Monitor lag and throughput
- Scale consumers horizontally
Reliability
- Enable idempotent producers
- Use consumer groups for scalability
- Implement retry logic with backoff
- Handle poison messages appropriately
- Monitor health continuously
Troubleshooting
Common Issues
Connection timeouts:
kafka:
dial_timeout: 60s
read_timeout: 60s
write_timeout: 60sConsumer lag:
kafka:
consumer_fetch_max: 2097152 # 2MB
consumer_max_wait: 100msProducer throughput:
kafka:
producer_flush_messages: 1000
producer_flush_frequency: 10ms
producer_compression: snappyAuthentication failures:
kafka:
enable_sasl: true
sasl_mechanism: SCRAM-SHA-512
sasl_username: correct_username
sasl_password: correct_passwordDebug Logging
app.AddExtension(kafka.NewExtension(
kafka.WithLogging(true),
// Other options...
))Health Monitoring
// Periodic health check
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
if err := client.Ping(ctx); err != nil {
log.Printf("Kafka health check failed: %v", err)
// Implement alerting logic
}
}
}()API Reference
Core Interface
type Kafka interface {
// Producer operations
SendMessage(topic string, key, value []byte) error
SendMessageAsync(topic string, key, value []byte) error
SendMessages(messages []*ProducerMessage) error
// Consumer operations
Consume(ctx context.Context, topics []string, handler MessageHandler) error
ConsumePartition(ctx context.Context, topic string, partition int32, offset int64, handler MessageHandler) error
StopConsume() error
// Consumer group operations
JoinConsumerGroup(ctx context.Context, groupID string, topics []string, handler ConsumerGroupHandler) error
LeaveConsumerGroup(ctx context.Context) error
// Admin operations
CreateTopic(topic string, config TopicConfig) error
DeleteTopic(topic string) error
ListTopics() ([]string, error)
DescribeTopic(topic string) (*TopicMetadata, error)
GetPartitions(topic string) ([]int32, error)
GetOffset(topic string, partition int32, time int64) (int64, error)
// Client access
GetProducer() sarama.SyncProducer
GetAsyncProducer() sarama.AsyncProducer
GetConsumer() sarama.Consumer
GetConsumerGroup() sarama.ConsumerGroup
GetClient() sarama.Client
GetStats() ClientStats
// Lifecycle
Close() error
Ping(ctx context.Context) error
}Configuration Options
// Connection options
func WithBrokers(brokers ...string) ConfigOption
func WithClientID(clientID string) ConfigOption
func WithVersion(version string) ConfigOption
// Security options
func WithTLS(certFile, keyFile, caFile string, skipVerify bool) ConfigOption
func WithSASL(mechanism, username, password string) ConfigOption
// Producer options
func WithProducer(enabled bool) ConfigOption
func WithCompression(compression string) ConfigOption
func WithIdempotent(enabled bool) ConfigOption
func WithProducerAcks(acks string) ConfigOption
// Consumer options
func WithConsumer(enabled bool) ConfigOption
func WithConsumerGroup(groupID string) ConfigOption
func WithConsumerOffsets(offsets string) ConfigOption
// Observability options
func WithMetrics(enabled bool) ConfigOption
func WithLogging(enabled bool) ConfigOption
func WithTracing(enabled bool) ConfigOptionThe Kafka extension provides a robust, production-ready solution for Apache Kafka integration with comprehensive features for messaging, event sourcing, and distributed communication patterns.
How is this guide?
Last updated on