Features
Kafka extension capabilities and client details
Kafka Interface
The extension exposes a comprehensive Kafka interface wrapping producer, consumer, consumer group, and admin operations:
type Kafka interface {
// Producer
SendMessage(topic string, key, value []byte) error
SendMessageAsync(topic string, key, value []byte) error
SendMessages(messages []*ProducerMessage) error
// Consumer
Consume(ctx context.Context, topics []string, handler MessageHandler) error
ConsumePartition(ctx context.Context, topic string, partition int32, offset int64, handler MessageHandler) error
StopConsume() error
// Consumer Groups
JoinConsumerGroup(ctx context.Context, groupID string, topics []string, handler ConsumerGroupHandler) error
LeaveConsumerGroup(ctx context.Context) error
// Admin
CreateTopic(topic string, config TopicConfig) error
DeleteTopic(topic string) error
ListTopics() ([]string, error)
DescribeTopic(topic string) (*TopicMetadata, error)
GetPartitions(topic string) ([]int32, error)
GetOffset(topic string, partition int32, time int64) (int64, error)
// Access underlying Sarama clients
GetProducer() sarama.SyncProducer
GetAsyncProducer() sarama.AsyncProducer
GetConsumer() sarama.Consumer
GetConsumerGroup() sarama.ConsumerGroup
GetClient() sarama.Client
GetStats() ClientStats
Close() error
Ping(ctx context.Context) error
}Sync Producer
Send messages and wait for broker acknowledgement. Configurable ack levels control durability guarantees:
// Send a single message with key for partitioning
client.SendMessage("events", []byte("user-123"), []byte(`{"action":"login"}`))
// Batch send multiple messages
client.SendMessages([]*kafka.ProducerMessage{
{Topic: "events", Key: []byte("user-1"), Value: []byte(`{"action":"login"}`)},
{Topic: "events", Key: []byte("user-2"), Value: []byte(`{"action":"signup"}`)},
})Ack levels (configured via ProducerAcks):
none-- fire-and-forget, no broker acknowledgementlocal-- leader only acknowledgesall-- all in-sync replicas acknowledge (strongest guarantee)
Async Producer
Send messages without blocking. Errors and successes are handled via background channels. Useful for high-throughput scenarios where occasional message loss is acceptable:
client.SendMessageAsync("metrics", []byte("server-1"), metricsPayload)Async producer errors are logged but not surfaced to callers. Use the sync producer for guaranteed delivery.
Compression
Support for multiple compression codecs configured via ProducerCompression:
| Codec | Trade-off |
|---|---|
none | No CPU overhead, maximum bandwidth |
gzip | Best compression ratio, highest CPU |
snappy | Fast compression, moderate ratio |
lz4 | Very fast, good ratio |
zstd | Best balance of speed and ratio |
Idempotent Producer
Enable exactly-once semantics at the producer level with ProducerIdempotent: true. This ensures that retries do not produce duplicate messages. Requires ProducerAcks: "all".
Batching
Configure flush thresholds for producer batching:
ProducerFlushMessages-- flush after N messages (default: 0, disabled)ProducerFlushFrequency-- flush every N duration (default: 0, disabled)
Consumer
Subscribe to topics with offset control and fetch tuning:
client.Consume(ctx, []string{"events", "metrics"}, func(msg *sarama.ConsumerMessage) error {
fmt.Printf("Topic: %s, Partition: %d, Offset: %d\n",
msg.Topic, msg.Partition, msg.Offset)
return nil
})
// Consume from a specific partition and offset
client.ConsumePartition(ctx, "events", 0, sarama.OffsetNewest, handler)Configuration options:
ConsumerOffsets--newest(default) oroldestConsumerMaxWait-- max wait time for fetch (default: 250ms)ConsumerFetchMin-- minimum fetch size in bytesConsumerFetchMax-- maximum fetch size in bytesConsumerIsolation--read_uncommittedorread_committed
Consumer Groups
Join consumer groups with configurable rebalance strategy:
client.JoinConsumerGroup(ctx, "analytics-group",
[]string{"events", "metrics"}, &myHandler{})Rebalance strategies (configured via ConsumerGroupRebalance):
range-- assigns contiguous partitions to consumersroundrobin-- distributes partitions evenlysticky-- minimizes partition movement during rebalance
Tunable parameters:
ConsumerGroupSession-- session timeout (default: 10s)ConsumerGroupHeartbeat-- heartbeat interval (default: 3s)
Topic Administration
// Create a topic
client.CreateTopic("events", kafka.TopicConfig{
NumPartitions: 6,
ReplicationFactor: 3,
ConfigEntries: map[string]*string{"retention.ms": ptr("604800000")},
})
// List, describe, delete
topics, _ := client.ListTopics()
meta, _ := client.DescribeTopic("events")
partitions, _ := client.GetPartitions("events")
client.DeleteTopic("old-events")TopicMetadata includes partition details with leader, replicas, and ISR information.
SASL Authentication
Support for multiple SASL mechanisms:
| Mechanism | Description |
|---|---|
PLAIN | Username/password in plaintext (use with TLS) |
SCRAM-SHA-256 | Challenge-response with SHA-256 |
SCRAM-SHA-512 | Challenge-response with SHA-512 |
The extension includes a built-in XDGSCRAMClient for SCRAM authentication.
TLS Encryption
Connect over TLS with custom certificates:
kafka.NewExtension(
kafka.WithTLS("cert.pem", "key.pem", "ca.pem"),
)Set TLSSkipVerify: true for development environments (not recommended for production).
Client Statistics
Track connection health and throughput:
stats := client.GetStats()
fmt.Printf("Connected: %v, Messages sent: %d, Errors: %d\n",
stats.Connected, stats.MessagesSent, stats.Errors)ClientStats includes: Connected, ConnectTime, MessagesSent, MessagesReceived, BytesSent, BytesReceived, Errors, LastError, ActiveConsumers, ActiveProducers.
Metadata Management
Automatic metadata refresh keeps the client informed about cluster topology changes:
MetadataRetryMax-- max retry attempts for metadata fetchMetadataRetryBackoff-- backoff between retriesMetadataRefreshFreq-- periodic refresh intervalMetadataFullRefresh-- full vs. incremental refresh
Sentinel Errors
| Error | Meaning |
|---|---|
ErrClientNotInitialized | Client has not been created |
ErrProducerNotEnabled | Producer is disabled in config |
ErrConsumerNotEnabled | Consumer is disabled in config |
ErrAlreadyConsuming | Already consuming the requested topic |
ErrNotConsuming | Not consuming the requested topic |
ErrInConsumerGroup | Operation conflicts with active consumer group |
ErrNotInConsumerGroup | Not in a consumer group |
ErrSendFailed | Message send failed |
ErrConsumeFailed | Consumer setup failed |
ErrTopicNotFound | Topic does not exist |
ErrInvalidPartition | Invalid partition number |
ErrConnectionFailed | Failed to connect to brokers |
ErrClientClosed | Client has been closed |
How is this guide?