Kafka

Configuration

Config structs, YAML examples, and option helpers for Kafka

YAML Configuration Example

config.yaml
extensions:
  kafka:
    brokers:
      - "localhost:9092"
    clientID: "forge-kafka-client"
    version: "3.0.0"
    dialTimeout: "30s"
    readTimeout: "30s"
    writeTimeout: "30s"

    # TLS
    enableTLS: false
    tlsCertFile: ""
    tlsKeyFile: ""
    tlsCAFile: ""

    # SASL
    enableSASL: false
    saslMechanism: "PLAIN"         # "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"
    saslUsername: ""
    saslPassword: ""

    # Producer
    producerEnabled: true
    producerMaxMessageBytes: 1000000
    producerCompression: "snappy"   # "none", "gzip", "snappy", "lz4", "zstd"
    producerFlushMessages: 10
    producerFlushFrequency: "100ms"
    producerRetryMax: 3
    producerIdempotent: false
    producerAcks: "local"           # "none", "local", "all"

    # Consumer
    consumerEnabled: true
    consumerGroupID: ""
    consumerOffsets: "newest"       # "newest" or "oldest"
    consumerMaxWait: "500ms"
    consumerFetchMin: 1
    consumerFetchMax: 1048576
    consumerIsolation: "read_uncommitted"

    # Consumer group
    consumerGroupRebalance: "range" # "range", "roundrobin", "sticky"
    consumerGroupSession: "10s"
    consumerGroupHeartbeat: "3s"

    # Metadata
    metadataRetryMax: 3
    metadataRetryBackoff: "250ms"
    metadataRefreshFreq: "10m"

    # Observability
    enableMetrics: true
    enableTracing: true
    enableLogging: true

The extension loads config from extensions.kafka first, falling back to kafka.

Programmatic Configuration

ext := kafka.NewExtension(
    kafka.WithBrokers("broker1:9092", "broker2:9092"),
    kafka.WithClientID("my-service"),
    kafka.WithSASL("SCRAM-SHA-256", "user", "pass"),
    kafka.WithCompression("lz4"),
    kafka.WithConsumerGroup("my-group", "range"),
    kafka.WithIdempotent(true),
)

Config Struct Reference

Config

Source: config.go

FieldTypeDefaultDescription
Brokers[]string["localhost:9092"]Kafka broker addresses
ClientIDstring"forge-kafka-client"Client identifier
Versionstring"3.0.0"Kafka protocol version
DialTimeouttime.Duration30sConnection timeout
ReadTimeouttime.Duration30sRead timeout
WriteTimeouttime.Duration30sWrite timeout
EnableTLSboolfalseEnable TLS
EnableSASLboolfalseEnable SASL authentication
SASLMechanismstring"PLAIN"SASL mechanism
SASLUsernamestring""SASL username
SASLPasswordstring""SASL password
ProducerEnabledbooltrueEnable producer
ProducerCompressionstring"snappy"Compression codec
ProducerFlushMessagesint10Flush after N messages
ProducerFlushFrequencytime.Duration100msFlush interval
ProducerRetryMaxint3Max producer retries
ProducerIdempotentboolfalseIdempotent producer
ProducerAcksstring"local"Ack level: none, local, all
ConsumerEnabledbooltrueEnable consumer
ConsumerGroupIDstring""Consumer group ID
ConsumerOffsetsstring"newest"Initial offset: newest, oldest
ConsumerGroupRebalancestring"range"Rebalance strategy
ConsumerGroupSessiontime.Duration10sSession timeout
ConsumerGroupHeartbeattime.Duration3sHeartbeat interval
EnableMetricsbooltrueEnable metrics
EnableTracingbooltrueEnable tracing

Option Helpers

FunctionDescription
WithBrokers(brokers...)Set broker addresses
WithClientID(id)Set client identifier
WithVersion(version)Set Kafka protocol version
WithTLS(certFile, keyFile, caFile)Enable TLS
WithSASL(mechanism, username, password)Enable SASL authentication
WithProducer(enabled)Toggle producer
WithConsumer(enabled)Toggle consumer
WithConsumerGroup(groupID, strategy)Set consumer group
WithCompression(codec)Set compression codec
WithIdempotent(enabled)Toggle idempotent producer
WithMetrics(enabled)Toggle metrics
WithTracing(enabled)Toggle tracing
WithRequireConfig(require)Require config from ConfigManager
WithConfig(config)Provide a complete config struct

How is this guide?

On this page