Kafka

Functions

Public API reference for the Kafka extension

Extension Entry Points

FunctionDescription
NewExtension(opts ...ConfigOption) forge.ExtensionCreate the Kafka extension with functional options
NewExtensionWithConfig(config Config) forge.ExtensionCreate the Kafka extension with a complete config
DefaultConfig() ConfigReturns the default configuration

DI Helpers

FunctionDescription
GetKafka(c forge.Container) (Kafka, error)Resolve the Kafka client from the container
MustGetKafka(c forge.Container) KafkaSame but panics on error
GetKafkaFromApp(app forge.App) (Kafka, error)Resolve from an app instance
MustGetKafkaFromApp(app forge.App) KafkaSame but panics on error

Kafka Interface -- Producer

MethodDescription
SendMessage(ctx, msg) (partition, offset, error)Send a message synchronously and wait for ack
SendMessages(ctx, msgs) errorSend multiple messages synchronously
SendAsync(ctx, msg) errorSend a message asynchronously (non-blocking)
SendBatch(ctx, msgs) errorSend a batch asynchronously

Kafka Interface -- Consumer

MethodDescription
Consume(ctx, topic, handler) errorStart consuming a topic with a message handler
StopConsuming(topic) errorStop consuming a topic
ConsumePartition(ctx, topic, partition, offset, handler) errorConsume a specific partition from an offset

Kafka Interface -- Consumer Group

MethodDescription
JoinGroup(ctx, groupID, topics, handler) errorJoin a consumer group for the given topics
LeaveGroup(ctx) errorLeave the current consumer group

Kafka Interface -- Admin

MethodDescription
CreateTopic(ctx, config) errorCreate a new topic
DeleteTopic(ctx, topic) errorDelete a topic
ListTopics(ctx) ([]string, error)List all topic names
DescribeTopic(ctx, topic) (*TopicMetadata, error)Get topic metadata including partitions

Kafka Interface -- Lifecycle

MethodDescription
Ping(ctx) errorVerify broker connectivity
Close() errorClose all producers, consumers, and connections
Stats() ClientStatsGet client statistics

Key Types

ProducerMessage

FieldTypePurpose
TopicstringTarget topic
Key[]byteMessage key (for partitioning)
Value[]byteMessage payload
Headers[]MessageHeaderMessage headers
Partitionint32Target partition (-1 for auto)
Timestamptime.TimeMessage timestamp

TopicConfig

FieldTypePurpose
NamestringTopic name
NumPartitionsint32Number of partitions
ReplicationFactorint16Replication factor
ConfigEntriesmap[string]*stringTopic config overrides

How is this guide?

On this page