Kafka

Features

Kafka extension capabilities and client details

Kafka Interface

The extension exposes a comprehensive Kafka interface wrapping producer, consumer, consumer group, and admin operations:

type Kafka interface {
    // Producer
    SendMessage(topic string, key, value []byte) error
    SendMessageAsync(topic string, key, value []byte) error
    SendMessages(messages []*ProducerMessage) error
    // Consumer
    Consume(ctx context.Context, topics []string, handler MessageHandler) error
    ConsumePartition(ctx context.Context, topic string, partition int32, offset int64, handler MessageHandler) error
    StopConsume() error
    // Consumer Groups
    JoinConsumerGroup(ctx context.Context, groupID string, topics []string, handler ConsumerGroupHandler) error
    LeaveConsumerGroup(ctx context.Context) error
    // Admin
    CreateTopic(topic string, config TopicConfig) error
    DeleteTopic(topic string) error
    ListTopics() ([]string, error)
    DescribeTopic(topic string) (*TopicMetadata, error)
    GetPartitions(topic string) ([]int32, error)
    GetOffset(topic string, partition int32, time int64) (int64, error)
    // Access underlying Sarama clients
    GetProducer() sarama.SyncProducer
    GetAsyncProducer() sarama.AsyncProducer
    GetConsumer() sarama.Consumer
    GetConsumerGroup() sarama.ConsumerGroup
    GetClient() sarama.Client
    GetStats() ClientStats
    Close() error
    Ping(ctx context.Context) error
}

Sync Producer

Send messages and wait for broker acknowledgement. Configurable ack levels control durability guarantees:

// Send a single message with key for partitioning
client.SendMessage("events", []byte("user-123"), []byte(`{"action":"login"}`))

// Batch send multiple messages
client.SendMessages([]*kafka.ProducerMessage{
    {Topic: "events", Key: []byte("user-1"), Value: []byte(`{"action":"login"}`)},
    {Topic: "events", Key: []byte("user-2"), Value: []byte(`{"action":"signup"}`)},
})

Ack levels (configured via ProducerAcks):

  • none -- fire-and-forget, no broker acknowledgement
  • local -- leader only acknowledges
  • all -- all in-sync replicas acknowledge (strongest guarantee)

Async Producer

Send messages without blocking. Errors and successes are handled via background channels. Useful for high-throughput scenarios where occasional message loss is acceptable:

client.SendMessageAsync("metrics", []byte("server-1"), metricsPayload)

Async producer errors are logged but not surfaced to callers. Use the sync producer for guaranteed delivery.

Compression

Support for multiple compression codecs configured via ProducerCompression:

CodecTrade-off
noneNo CPU overhead, maximum bandwidth
gzipBest compression ratio, highest CPU
snappyFast compression, moderate ratio
lz4Very fast, good ratio
zstdBest balance of speed and ratio

Idempotent Producer

Enable exactly-once semantics at the producer level with ProducerIdempotent: true. This ensures that retries do not produce duplicate messages. Requires ProducerAcks: "all".

Batching

Configure flush thresholds for producer batching:

  • ProducerFlushMessages -- flush after N messages (default: 0, disabled)
  • ProducerFlushFrequency -- flush every N duration (default: 0, disabled)

Consumer

Subscribe to topics with offset control and fetch tuning:

client.Consume(ctx, []string{"events", "metrics"}, func(msg *sarama.ConsumerMessage) error {
    fmt.Printf("Topic: %s, Partition: %d, Offset: %d\n",
        msg.Topic, msg.Partition, msg.Offset)
    return nil
})

// Consume from a specific partition and offset
client.ConsumePartition(ctx, "events", 0, sarama.OffsetNewest, handler)

Configuration options:

  • ConsumerOffsets -- newest (default) or oldest
  • ConsumerMaxWait -- max wait time for fetch (default: 250ms)
  • ConsumerFetchMin -- minimum fetch size in bytes
  • ConsumerFetchMax -- maximum fetch size in bytes
  • ConsumerIsolation -- read_uncommitted or read_committed

Consumer Groups

Join consumer groups with configurable rebalance strategy:

client.JoinConsumerGroup(ctx, "analytics-group",
    []string{"events", "metrics"}, &myHandler{})

Rebalance strategies (configured via ConsumerGroupRebalance):

  • range -- assigns contiguous partitions to consumers
  • roundrobin -- distributes partitions evenly
  • sticky -- minimizes partition movement during rebalance

Tunable parameters:

  • ConsumerGroupSession -- session timeout (default: 10s)
  • ConsumerGroupHeartbeat -- heartbeat interval (default: 3s)

Topic Administration

// Create a topic
client.CreateTopic("events", kafka.TopicConfig{
    NumPartitions:     6,
    ReplicationFactor: 3,
    ConfigEntries:     map[string]*string{"retention.ms": ptr("604800000")},
})

// List, describe, delete
topics, _ := client.ListTopics()
meta, _ := client.DescribeTopic("events")
partitions, _ := client.GetPartitions("events")
client.DeleteTopic("old-events")

TopicMetadata includes partition details with leader, replicas, and ISR information.

SASL Authentication

Support for multiple SASL mechanisms:

MechanismDescription
PLAINUsername/password in plaintext (use with TLS)
SCRAM-SHA-256Challenge-response with SHA-256
SCRAM-SHA-512Challenge-response with SHA-512

The extension includes a built-in XDGSCRAMClient for SCRAM authentication.

TLS Encryption

Connect over TLS with custom certificates:

kafka.NewExtension(
    kafka.WithTLS("cert.pem", "key.pem", "ca.pem"),
)

Set TLSSkipVerify: true for development environments (not recommended for production).

Client Statistics

Track connection health and throughput:

stats := client.GetStats()
fmt.Printf("Connected: %v, Messages sent: %d, Errors: %d\n",
    stats.Connected, stats.MessagesSent, stats.Errors)

ClientStats includes: Connected, ConnectTime, MessagesSent, MessagesReceived, BytesSent, BytesReceived, Errors, LastError, ActiveConsumers, ActiveProducers.

Metadata Management

Automatic metadata refresh keeps the client informed about cluster topology changes:

  • MetadataRetryMax -- max retry attempts for metadata fetch
  • MetadataRetryBackoff -- backoff between retries
  • MetadataRefreshFreq -- periodic refresh interval
  • MetadataFullRefresh -- full vs. incremental refresh

Sentinel Errors

ErrorMeaning
ErrClientNotInitializedClient has not been created
ErrProducerNotEnabledProducer is disabled in config
ErrConsumerNotEnabledConsumer is disabled in config
ErrAlreadyConsumingAlready consuming the requested topic
ErrNotConsumingNot consuming the requested topic
ErrInConsumerGroupOperation conflicts with active consumer group
ErrNotInConsumerGroupNot in a consumer group
ErrSendFailedMessage send failed
ErrConsumeFailedConsumer setup failed
ErrTopicNotFoundTopic does not exist
ErrInvalidPartitionInvalid partition number
ErrConnectionFailedFailed to connect to brokers
ErrClientClosedClient has been closed

How is this guide?

On this page