Kafka
Kafka
Apache Kafka producer and consumer client for Forge applications
Overview
github.com/xraph/forge/extensions/kafka provides an Apache Kafka client built on the Sarama library.
It registers a KafkaService in the DI container that wraps a full-featured Kafka client with
producer, consumer, consumer group, and admin capabilities.
What It Registers
| Service | DI Key | Type |
|---|---|---|
| Kafka service | kafka | *KafkaService (also satisfies Kafka) |
The service is managed by Vessel. Start verifies broker connectivity via Ping() and Stop closes
all producers, consumers, and the underlying Sarama client.
Quick Start
package main
import (
"context"
"fmt"
"github.com/IBM/sarama"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/kafka"
)
func main() {
app := forge.NewApp(forge.AppConfig{Name: "my-app", Version: "1.0.0"})
app.RegisterExtension(kafka.NewExtension(
kafka.WithBrokers("localhost:9092", "localhost:9093"),
kafka.WithClientID("my-service"),
kafka.WithProducer(true),
kafka.WithConsumer(true),
kafka.WithConsumerGroup("my-group"),
))
ctx := context.Background()
app.Start(ctx)
defer app.Stop(ctx)
// Resolve the Kafka client from DI
svc, _ := forge.InjectType[*kafka.KafkaService](app.Container())
client := svc // KafkaService satisfies the Kafka interface
// Create a topic
client.CreateTopic("events", kafka.TopicConfig{
NumPartitions: 6,
ReplicationFactor: 3,
})
// Send a message synchronously
client.SendMessage("events", []byte("user-123"), []byte(`{"action":"login"}`))
// Send a message asynchronously (fire-and-forget)
client.SendMessageAsync("events", []byte("user-456"), []byte(`{"action":"signup"}`))
// Consume messages from a topic
client.Consume(ctx, []string{"events"}, func(msg *sarama.ConsumerMessage) error {
fmt.Printf("Received: key=%s value=%s partition=%d offset=%d\n",
string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
return nil
})
}Consumer Groups
Join a consumer group for coordinated consumption across multiple instances:
client.JoinConsumerGroup(ctx, "analytics-group", []string{"events", "metrics"},
&myGroupHandler{})
// Handler implements kafka.ConsumerGroupHandler
type myGroupHandler struct{}
func (h *myGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
return nil
}
func (h *myGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
return nil
}
func (h *myGroupHandler) ConsumeClaim(
session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error {
for msg := range claim.Messages() {
fmt.Printf("Group message: %s\n", string(msg.Value))
session.MarkMessage(msg, "")
}
return nil
}Using Kafka in Your Services
Inject *kafka.KafkaService for automatic DI resolution:
type EventPublisher struct {
kafka kafka.Kafka
logger forge.Logger
}
func NewEventPublisher(k *kafka.KafkaService, logger forge.Logger) *EventPublisher {
return &EventPublisher{kafka: k, logger: logger}
}
func (p *EventPublisher) PublishUserEvent(action string, userID string, data []byte) error {
return p.kafka.SendMessage("user-events", []byte(userID), data)
}Register with Vessel:
forge.ProvideConstructor(app.Container(), NewEventPublisher)Key Concepts
- Producer -- send messages synchronously or asynchronously with configurable compression, batching, retries, idempotency, and ack levels.
- Consumer -- subscribe to topics with offset control (newest/oldest), fetch tuning, and isolation levels.
- Consumer groups -- join a consumer group with configurable rebalance strategy (range, round-robin, sticky), session timeout, and heartbeat interval.
- Admin operations -- create topics, delete topics, list topics, describe topics, and get partition metadata.
- SASL/TLS -- authenticate with PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512. Connect over TLS with custom certificates.
Important Runtime Notes
- The Sarama client is created during
NewKafkaService. Connection health is verified duringStart. - Producer and consumer can be independently enabled/disabled via config.
- Async producer errors are logged but not surfaced to callers. Use sync producer for guaranteed delivery.
Detailed Pages
How is this guide?