Kafka
Configuration
Config structs, YAML examples, and option helpers for Kafka
YAML Configuration Example
extensions:
kafka:
brokers:
- "localhost:9092"
clientID: "forge-kafka-client"
version: "3.0.0"
dialTimeout: "30s"
readTimeout: "30s"
writeTimeout: "30s"
# TLS
enableTLS: false
tlsCertFile: ""
tlsKeyFile: ""
tlsCAFile: ""
# SASL
enableSASL: false
saslMechanism: "PLAIN" # "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"
saslUsername: ""
saslPassword: ""
# Producer
producerEnabled: true
producerMaxMessageBytes: 1000000
producerCompression: "snappy" # "none", "gzip", "snappy", "lz4", "zstd"
producerFlushMessages: 10
producerFlushFrequency: "100ms"
producerRetryMax: 3
producerIdempotent: false
producerAcks: "local" # "none", "local", "all"
# Consumer
consumerEnabled: true
consumerGroupID: ""
consumerOffsets: "newest" # "newest" or "oldest"
consumerMaxWait: "500ms"
consumerFetchMin: 1
consumerFetchMax: 1048576
consumerIsolation: "read_uncommitted"
# Consumer group
consumerGroupRebalance: "range" # "range", "roundrobin", "sticky"
consumerGroupSession: "10s"
consumerGroupHeartbeat: "3s"
# Metadata
metadataRetryMax: 3
metadataRetryBackoff: "250ms"
metadataRefreshFreq: "10m"
# Observability
enableMetrics: true
enableTracing: true
enableLogging: trueThe extension loads config from extensions.kafka first, falling back to kafka.
Programmatic Configuration
ext := kafka.NewExtension(
kafka.WithBrokers("broker1:9092", "broker2:9092"),
kafka.WithClientID("my-service"),
kafka.WithSASL("SCRAM-SHA-256", "user", "pass"),
kafka.WithCompression("lz4"),
kafka.WithConsumerGroup("my-group", "range"),
kafka.WithIdempotent(true),
)Config Struct Reference
Config
Source: config.go
| Field | Type | Default | Description |
|---|---|---|---|
Brokers | []string | ["localhost:9092"] | Kafka broker addresses |
ClientID | string | "forge-kafka-client" | Client identifier |
Version | string | "3.0.0" | Kafka protocol version |
DialTimeout | time.Duration | 30s | Connection timeout |
ReadTimeout | time.Duration | 30s | Read timeout |
WriteTimeout | time.Duration | 30s | Write timeout |
EnableTLS | bool | false | Enable TLS |
EnableSASL | bool | false | Enable SASL authentication |
SASLMechanism | string | "PLAIN" | SASL mechanism |
SASLUsername | string | "" | SASL username |
SASLPassword | string | "" | SASL password |
ProducerEnabled | bool | true | Enable producer |
ProducerCompression | string | "snappy" | Compression codec |
ProducerFlushMessages | int | 10 | Flush after N messages |
ProducerFlushFrequency | time.Duration | 100ms | Flush interval |
ProducerRetryMax | int | 3 | Max producer retries |
ProducerIdempotent | bool | false | Idempotent producer |
ProducerAcks | string | "local" | Ack level: none, local, all |
ConsumerEnabled | bool | true | Enable consumer |
ConsumerGroupID | string | "" | Consumer group ID |
ConsumerOffsets | string | "newest" | Initial offset: newest, oldest |
ConsumerGroupRebalance | string | "range" | Rebalance strategy |
ConsumerGroupSession | time.Duration | 10s | Session timeout |
ConsumerGroupHeartbeat | time.Duration | 3s | Heartbeat interval |
EnableMetrics | bool | true | Enable metrics |
EnableTracing | bool | true | Enable tracing |
Option Helpers
| Function | Description |
|---|---|
WithBrokers(brokers...) | Set broker addresses |
WithClientID(id) | Set client identifier |
WithVersion(version) | Set Kafka protocol version |
WithTLS(certFile, keyFile, caFile) | Enable TLS |
WithSASL(mechanism, username, password) | Enable SASL authentication |
WithProducer(enabled) | Toggle producer |
WithConsumer(enabled) | Toggle consumer |
WithConsumerGroup(groupID, strategy) | Set consumer group |
WithCompression(codec) | Set compression codec |
WithIdempotent(enabled) | Toggle idempotent producer |
WithMetrics(enabled) | Toggle metrics |
WithTracing(enabled) | Toggle tracing |
WithRequireConfig(require) | Require config from ConfigManager |
WithConfig(config) | Provide a complete config struct |
How is this guide?