Kafka

Kafka

Apache Kafka producer and consumer client for Forge applications

Overview

github.com/xraph/forge/extensions/kafka provides an Apache Kafka client built on the Sarama library. It registers a KafkaService in the DI container that wraps a full-featured Kafka client with producer, consumer, consumer group, and admin capabilities.

What It Registers

ServiceDI KeyType
Kafka servicekafka*KafkaService (also satisfies Kafka)

The service is managed by Vessel. Start verifies broker connectivity via Ping() and Stop closes all producers, consumers, and the underlying Sarama client.

Quick Start

package main

import (
    "context"
    "fmt"

    "github.com/IBM/sarama"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/kafka"
)

func main() {
    app := forge.NewApp(forge.AppConfig{Name: "my-app", Version: "1.0.0"})

    app.RegisterExtension(kafka.NewExtension(
        kafka.WithBrokers("localhost:9092", "localhost:9093"),
        kafka.WithClientID("my-service"),
        kafka.WithProducer(true),
        kafka.WithConsumer(true),
        kafka.WithConsumerGroup("my-group"),
    ))

    ctx := context.Background()
    app.Start(ctx)
    defer app.Stop(ctx)

    // Resolve the Kafka client from DI
    svc, _ := forge.InjectType[*kafka.KafkaService](app.Container())
    client := svc // KafkaService satisfies the Kafka interface

    // Create a topic
    client.CreateTopic("events", kafka.TopicConfig{
        NumPartitions:     6,
        ReplicationFactor: 3,
    })

    // Send a message synchronously
    client.SendMessage("events", []byte("user-123"), []byte(`{"action":"login"}`))

    // Send a message asynchronously (fire-and-forget)
    client.SendMessageAsync("events", []byte("user-456"), []byte(`{"action":"signup"}`))

    // Consume messages from a topic
    client.Consume(ctx, []string{"events"}, func(msg *sarama.ConsumerMessage) error {
        fmt.Printf("Received: key=%s value=%s partition=%d offset=%d\n",
            string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
        return nil
    })
}

Consumer Groups

Join a consumer group for coordinated consumption across multiple instances:

client.JoinConsumerGroup(ctx, "analytics-group", []string{"events", "metrics"},
    &myGroupHandler{})

// Handler implements kafka.ConsumerGroupHandler
type myGroupHandler struct{}

func (h *myGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
    return nil
}

func (h *myGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
    return nil
}

func (h *myGroupHandler) ConsumeClaim(
    session sarama.ConsumerGroupSession,
    claim sarama.ConsumerGroupClaim,
) error {
    for msg := range claim.Messages() {
        fmt.Printf("Group message: %s\n", string(msg.Value))
        session.MarkMessage(msg, "")
    }
    return nil
}

Using Kafka in Your Services

Inject *kafka.KafkaService for automatic DI resolution:

type EventPublisher struct {
    kafka  kafka.Kafka
    logger forge.Logger
}

func NewEventPublisher(k *kafka.KafkaService, logger forge.Logger) *EventPublisher {
    return &EventPublisher{kafka: k, logger: logger}
}

func (p *EventPublisher) PublishUserEvent(action string, userID string, data []byte) error {
    return p.kafka.SendMessage("user-events", []byte(userID), data)
}

Register with Vessel:

forge.ProvideConstructor(app.Container(), NewEventPublisher)

Key Concepts

  • Producer -- send messages synchronously or asynchronously with configurable compression, batching, retries, idempotency, and ack levels.
  • Consumer -- subscribe to topics with offset control (newest/oldest), fetch tuning, and isolation levels.
  • Consumer groups -- join a consumer group with configurable rebalance strategy (range, round-robin, sticky), session timeout, and heartbeat interval.
  • Admin operations -- create topics, delete topics, list topics, describe topics, and get partition metadata.
  • SASL/TLS -- authenticate with PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512. Connect over TLS with custom certificates.

Important Runtime Notes

  • The Sarama client is created during NewKafkaService. Connection health is verified during Start.
  • Producer and consumer can be independently enabled/disabled via config.
  • Async producer errors are logged but not surfaced to callers. Use sync producer for guaranteed delivery.

Detailed Pages

How is this guide?

On this page