Kafka
Functions
Public API reference for the Kafka extension
| Function | Description |
|---|
NewExtension(opts ...ConfigOption) forge.Extension | Create the Kafka extension with functional options |
NewExtensionWithConfig(config Config) forge.Extension | Create the Kafka extension with a complete config |
DefaultConfig() Config | Returns the default configuration |
| Function | Description |
|---|
GetKafka(c forge.Container) (Kafka, error) | Resolve the Kafka client from the container |
MustGetKafka(c forge.Container) Kafka | Same but panics on error |
GetKafkaFromApp(app forge.App) (Kafka, error) | Resolve from an app instance |
MustGetKafkaFromApp(app forge.App) Kafka | Same but panics on error |
| Method | Description |
|---|
SendMessage(ctx, msg) (partition, offset, error) | Send a message synchronously and wait for ack |
SendMessages(ctx, msgs) error | Send multiple messages synchronously |
SendAsync(ctx, msg) error | Send a message asynchronously (non-blocking) |
SendBatch(ctx, msgs) error | Send a batch asynchronously |
| Method | Description |
|---|
Consume(ctx, topic, handler) error | Start consuming a topic with a message handler |
StopConsuming(topic) error | Stop consuming a topic |
ConsumePartition(ctx, topic, partition, offset, handler) error | Consume a specific partition from an offset |
| Method | Description |
|---|
JoinGroup(ctx, groupID, topics, handler) error | Join a consumer group for the given topics |
LeaveGroup(ctx) error | Leave the current consumer group |
| Method | Description |
|---|
CreateTopic(ctx, config) error | Create a new topic |
DeleteTopic(ctx, topic) error | Delete a topic |
ListTopics(ctx) ([]string, error) | List all topic names |
DescribeTopic(ctx, topic) (*TopicMetadata, error) | Get topic metadata including partitions |
| Method | Description |
|---|
Ping(ctx) error | Verify broker connectivity |
Close() error | Close all producers, consumers, and connections |
Stats() ClientStats | Get client statistics |
| Field | Type | Purpose |
|---|
Topic | string | Target topic |
Key | []byte | Message key (for partitioning) |
Value | []byte | Message payload |
Headers | []MessageHeader | Message headers |
Partition | int32 | Target partition (-1 for auto) |
Timestamp | time.Time | Message timestamp |
| Field | Type | Purpose |
|---|
Name | string | Topic name |
NumPartitions | int32 | Number of partitions |
ReplicationFactor | int16 | Replication factor |
ConfigEntries | map[string]*string | Topic config overrides |
How is this guide?