AsyncAPI

Generate AsyncAPI documentation for event-driven APIs and messaging systems

Forge provides comprehensive AsyncAPI support for documenting event-driven APIs, message queues, and real-time communication systems. The framework automatically generates AsyncAPI specifications from your event definitions and provides built-in endpoints for API documentation.

AsyncAPI Interface

Forge's AsyncAPI interface provides automatic documentation generation for event-driven systems:

type AsyncAPIManager interface {
    // Specification generation
    GenerateSpec() (*asyncapi.Spec, error)
    AddChannel(name string, channel Channel) error
    
    // Message documentation
    DocumentMessage(channel string, message Message) error
    DocumentOperation(operation Operation) error
    
    // Customization
    SetInfo(info AsyncAPIInfo) error
    SetServers(servers []AsyncAPIServer) error
    
    // Validation
    ValidateMessage(schema interface{}, data interface{}) error
}

Event-Driven API Documentation

Basic AsyncAPI Setup

func main() {
    app := forge.NewApp(forge.AppConfig{
        Name: "event-driven-api",
        Version: "1.0.0",
    })
    
    // Enable AsyncAPI documentation
    app.Router().Use(forge.AsyncAPIMiddleware(forge.AsyncAPIConfig{
        Title:       "Event-Driven API",
        Version:     "1.0.0",
        Description: "A comprehensive event-driven API for real-time communication",
        Contact: forge.AsyncAPIContact{
            Name:  "API Support",
            Email: "support@example.com",
        },
        License: forge.AsyncAPILicense{
            Name: "MIT",
            URL:  "https://opensource.org/licenses/MIT",
        },
    }))
    
    // Document event channels
    app.Router().DocumentEventChannel("user.events", userEventChannel)
    app.Router().DocumentEventChannel("order.events", orderEventChannel)
    
    app.Run()
}

Event Channel Documentation

// User events channel
var userEventChannel = forge.EventChannel{
    Description: "User-related events",
    Messages: map[string]forge.EventMessage{
        "user.created": {
            Summary:     "User created event",
            Description: "Published when a new user is created",
            Payload:     UserCreatedEvent{},
            Tags:        []string{"user", "creation"},
        },
        "user.updated": {
            Summary:     "User updated event",
            Description: "Published when a user is updated",
            Payload:     UserUpdatedEvent{},
            Tags:        []string{"user", "update"},
        },
        "user.deleted": {
            Summary:     "User deleted event",
            Description: "Published when a user is deleted",
            Payload:     UserDeletedEvent{},
            Tags:        []string{"user", "deletion"},
        },
    },
}

// Order events channel
var orderEventChannel = forge.EventChannel{
    Description: "Order-related events",
    Messages: map[string]forge.EventMessage{
        "order.created": {
            Summary:     "Order created event",
            Description: "Published when a new order is created",
            Payload:     OrderCreatedEvent{},
            Tags:        []string{"order", "creation"},
        },
        "order.updated": {
            Summary:     "Order updated event",
            Description: "Published when an order is updated",
            Payload:     OrderUpdatedEvent{},
            Tags:        []string{"order", "update"},
        },
        "order.completed": {
            Summary:     "Order completed event",
            Description: "Published when an order is completed",
            Payload:     OrderCompletedEvent{},
            Tags:        []string{"order", "completion"},
        },
    },
}

Event Schema Definitions

Event Payload Schemas

// User event schemas
type UserCreatedEvent struct {
    UserID    uuid.UUID `json:"user_id" description:"Unique user identifier" format:"uuid"`
    Email     string    `json:"email" description:"User email address" format:"email"`
    Name      string    `json:"name" description:"User full name"`
    CreatedAt time.Time `json:"created_at" description:"User creation timestamp" format:"date-time"`
    Metadata  EventMetadata `json:"metadata" description:"Event metadata"`
}

type UserUpdatedEvent struct {
    UserID    uuid.UUID `json:"user_id" description:"Unique user identifier" format:"uuid"`
    Email     string    `json:"email" description:"User email address" format:"email"`
    Name      string    `json:"name" description:"User full name"`
    UpdatedAt time.Time `json:"updated_at" description:"User update timestamp" format:"date-time"`
    Changes   []string  `json:"changes" description:"List of changed fields"`
    Metadata  EventMetadata `json:"metadata" description:"Event metadata"`
}

type UserDeletedEvent struct {
    UserID    uuid.UUID `json:"user_id" description:"Unique user identifier" format:"uuid"`
    DeletedAt time.Time `json:"deleted_at" description:"User deletion timestamp" format:"date-time"`
    Reason    string    `json:"reason" description:"Reason for deletion"`
    Metadata  EventMetadata `json:"metadata" description:"Event metadata"`
}

// Order event schemas
type OrderCreatedEvent struct {
    OrderID   uuid.UUID `json:"order_id" description:"Unique order identifier" format:"uuid"`
    UserID    uuid.UUID `json:"user_id" description:"User identifier" format:"uuid"`
    Items     []OrderItem `json:"items" description:"Order items"`
    Total     float64   `json:"total" description:"Order total amount"`
    CreatedAt time.Time `json:"created_at" description:"Order creation timestamp" format:"date-time"`
    Metadata  EventMetadata `json:"metadata" description:"Event metadata"`
}

type OrderUpdatedEvent struct {
    OrderID   uuid.UUID `json:"order_id" description:"Unique order identifier" format:"uuid"`
    Status    string    `json:"status" description:"Order status"`
    UpdatedAt time.Time `json:"updated_at" description:"Order update timestamp" format:"date-time"`
    Changes   []string  `json:"changes" description:"List of changed fields"`
    Metadata  EventMetadata `json:"metadata" description:"Event metadata"`
}

type OrderCompletedEvent struct {
    OrderID     uuid.UUID `json:"order_id" description:"Unique order identifier" format:"uuid"`
    CompletedAt time.Time `json:"completed_at" description:"Order completion timestamp" format:"date-time"`
    FinalTotal  float64   `json:"final_total" description:"Final order total"`
    Metadata    EventMetadata `json:"metadata" description:"Event metadata"`
}

// Common schemas
type EventMetadata struct {
    EventID    uuid.UUID `json:"event_id" description:"Unique event identifier" format:"uuid"`
    Timestamp  time.Time `json:"timestamp" description:"Event timestamp" format:"date-time"`
    Version    string    `json:"version" description:"Event schema version"`
    Source     string    `json:"source" description:"Event source service"`
    CorrelationID string `json:"correlation_id" description:"Correlation ID for tracing"`
}

type OrderItem struct {
    ProductID uuid.UUID `json:"product_id" description:"Product identifier" format:"uuid"`
    Name      string    `json:"name" description:"Product name"`
    Quantity  int       `json:"quantity" description:"Item quantity" minimum:"1"`
    Price     float64   `json:"price" description:"Item price" minimum:"0"`
}

Message Queue Integration

Kafka Integration

// Kafka event channel
var kafkaEventChannel = forge.EventChannel{
    Description: "Kafka-based event channel",
    Servers: []forge.AsyncAPIServer{
        {
            URL:         "kafka://localhost:9092",
            Protocol:    "kafka",
            Description: "Local Kafka broker",
        },
    },
    Messages: map[string]forge.EventMessage{
        "user.events": {
            Summary:     "User events",
            Description: "User-related events published to Kafka",
            Payload:     UserEvent{},
            Tags:        []string{"user", "kafka"},
        },
    },
}

// Kafka message producer
type KafkaProducer struct {
    producer *kafka.Producer
}

func (kp *KafkaProducer) PublishUserEvent(event UserEvent) error {
    message := forge.EventMessage{
        Channel: "user.events",
        Payload: event,
        Headers: map[string]interface{}{
            "event_type": "user.created",
            "timestamp":  time.Now().Unix(),
        },
    }
    
    return kp.producer.Produce(message)
}

NATS Integration

// NATS event channel
var natsEventChannel = forge.EventChannel{
    Description: "NATS-based event channel",
    Servers: []forge.AsyncAPIServer{
        {
            URL:         "nats://localhost:4222",
            Protocol:    "nats",
            Description: "Local NATS server",
        },
    },
    Messages: map[string]forge.EventMessage{
        "order.events": {
            Summary:     "Order events",
            Description: "Order-related events published to NATS",
            Payload:     OrderEvent{},
            Tags:        []string{"order", "nats"},
        },
    },
}

// NATS message producer
type NATSProducer struct {
    conn *nats.Conn
}

func (np *NATSProducer) PublishOrderEvent(event OrderEvent) error {
    message := forge.EventMessage{
        Channel: "order.events",
        Payload: event,
        Headers: map[string]interface{}{
            "event_type": "order.created",
            "timestamp":  time.Now().Unix(),
        },
    }
    
    return np.conn.Publish("order.events", message)
}

WebSocket Event Documentation

WebSocket Event Channel

// WebSocket event channel
var websocketEventChannel = forge.EventChannel{
    Description: "WebSocket-based real-time events",
    Servers: []forge.AsyncAPIServer{
        {
            URL:         "ws://localhost:8080/ws",
            Protocol:    "ws",
            Description: "WebSocket server",
        },
    },
    Messages: map[string]forge.EventMessage{
        "chat.message": {
            Summary:     "Chat message",
            Description: "Real-time chat message",
            Payload:     ChatMessage{},
            Tags:        []string{"chat", "websocket"},
        },
        "user.typing": {
            Summary:     "User typing indicator",
            Description: "Indicates when a user is typing",
            Payload:     TypingIndicator{},
            Tags:        []string{"chat", "typing"},
        },
        "user.online": {
            Summary:     "User online status",
            Description: "User online/offline status",
            Payload:     UserStatus{},
            Tags:        []string{"user", "status"},
        },
    },
}

// WebSocket event schemas
type ChatMessage struct {
    MessageID uuid.UUID `json:"message_id" description:"Unique message identifier" format:"uuid"`
    UserID    uuid.UUID `json:"user_id" description:"User identifier" format:"uuid"`
    Username  string    `json:"username" description:"Username"`
    Content   string    `json:"content" description:"Message content"`
    Timestamp time.Time `json:"timestamp" description:"Message timestamp" format:"date-time"`
    RoomID    string    `json:"room_id" description:"Chat room identifier"`
}

type TypingIndicator struct {
    UserID   uuid.UUID `json:"user_id" description:"User identifier" format:"uuid"`
    Username string    `json:"username" description:"Username"`
    RoomID   string    `json:"room_id" description:"Chat room identifier"`
    IsTyping bool      `json:"is_typing" description:"Whether user is typing"`
}

type UserStatus struct {
    UserID   uuid.UUID `json:"user_id" description:"User identifier" format:"uuid"`
    Username string    `json:"username" description:"Username"`
    Status   string    `json:"status" description:"User status" enum:"online,offline,away"`
    Timestamp time.Time `json:"timestamp" description:"Status timestamp" format:"date-time"`
}

Server-Sent Events Documentation

SSE Event Channel

// SSE event channel
var sseEventChannel = forge.EventChannel{
    Description: "Server-Sent Events for real-time updates",
    Servers: []forge.AsyncAPIServer{
        {
            URL:         "http://localhost:8080/events",
            Protocol:    "sse",
            Description: "Server-Sent Events endpoint",
        },
    },
    Messages: map[string]forge.EventMessage{
        "notification": {
            Summary:     "Notification event",
            Description: "Real-time notification",
            Payload:     Notification{},
            Tags:        []string{"notification", "sse"},
        },
        "system.alert": {
            Summary:     "System alert",
            Description: "System alert notification",
            Payload:     SystemAlert{},
            Tags:        []string{"system", "alert"},
        },
    },
}

// SSE event schemas
type Notification struct {
    NotificationID uuid.UUID `json:"notification_id" description:"Unique notification identifier" format:"uuid"`
    UserID         uuid.UUID `json:"user_id" description:"User identifier" format:"uuid"`
    Title          string    `json:"title" description:"Notification title"`
    Message        string    `json:"message" description:"Notification message"`
    Type           string    `json:"type" description:"Notification type" enum:"info,warning,error,success"`
    Timestamp      time.Time `json:"timestamp" description:"Notification timestamp" format:"date-time"`
    Read           bool      `json:"read" description:"Whether notification is read"`
}

type SystemAlert struct {
    AlertID   uuid.UUID `json:"alert_id" description:"Unique alert identifier" format:"uuid"`
    Level     string    `json:"level" description:"Alert level" enum:"low,medium,high,critical"`
    Message   string    `json:"message" description:"Alert message"`
    Source    string    `json:"source" description:"Alert source"`
    Timestamp time.Time `json:"timestamp" description:"Alert timestamp" format:"date-time"`
    Resolved  bool      `json:"resolved" description:"Whether alert is resolved"`
}

AsyncAPI Configuration

Basic Configuration

app := forge.NewApp(forge.AppConfig{
    AsyncAPIConfig: forge.AsyncAPIConfig{
        Enabled:     true,
        Title:       "Event-Driven API",
        Version:     "1.0.0",
        Description: "A comprehensive event-driven API for real-time communication",
        Contact: forge.AsyncAPIContact{
            Name:  "API Support",
            Email: "support@example.com",
            URL:   "https://example.com/support",
        },
        License: forge.AsyncAPILicense{
            Name: "MIT",
            URL:  "https://opensource.org/licenses/MIT",
        },
        Servers: []forge.AsyncAPIServer{
            {
                URL:         "kafka://localhost:9092",
                Protocol:    "kafka",
                Description: "Kafka message broker",
            },
            {
                URL:         "ws://localhost:8080/ws",
                Protocol:    "ws",
                Description: "WebSocket server",
            },
        },
    },
})

Advanced Configuration

asyncAPIConfig := forge.AsyncAPIConfig{
    // Basic info
    Enabled:     true,
    Title:       "Event-Driven API",
    Version:     "1.0.0",
    Description: "A comprehensive event-driven API for real-time communication",
    
    // Contact information
    Contact: forge.AsyncAPIContact{
        Name:  "API Support",
        Email: "support@example.com",
        URL:   "https://example.com/support",
    },
    
    // License information
    License: forge.AsyncAPILicense{
        Name: "MIT",
        URL:  "https://opensource.org/licenses/MIT",
    },
    
    // Servers
    Servers: []forge.AsyncAPIServer{
        {
            URL:         "kafka://localhost:9092",
            Protocol:    "kafka",
            Description: "Kafka message broker",
        },
        {
            URL:         "nats://localhost:4222",
            Protocol:    "nats",
            Description: "NATS message broker",
        },
        {
            URL:         "ws://localhost:8080/ws",
            Protocol:    "ws",
            Description: "WebSocket server",
        },
    },
    
    // Custom settings
    CustomSettings: map[string]interface{}{
        "enable_validation": true,
        "strict_mode":       false,
        "generate_examples": true,
    },
}

AsyncAPI Endpoints

Built-in Endpoints

Forge automatically provides AsyncAPI endpoints:

# AsyncAPI specification
GET /_/asyncapi.json

# AsyncAPI Studio
GET /_/asyncapi-studio

# AsyncAPI YAML
GET /_/asyncapi.yaml

Custom Documentation Endpoints

// Custom AsyncAPI endpoint
app.Router().GET("/docs/asyncapi", func(ctx forge.Context) error {
    spec := app.Router().GenerateAsyncAPISpec()
    
    // Customize spec
    spec.Info.Title = "My Custom Event API"
    spec.Info.Version = "2.0.0"
    
    return ctx.JSON(200, spec)
})

// Custom AsyncAPI Studio
app.Router().GET("/docs/asyncapi-studio", func(ctx forge.Context) error {
    html := generateAsyncAPIStudio("/docs/asyncapi")
    return ctx.HTML(200, html)
})

Event Validation

Message Validation

func publishUserEvent(event UserEvent) error {
    // Validate event against AsyncAPI schema
    if err := app.Router().ValidateMessage(UserEvent{}, event); err != nil {
        app.Logger().Error("event validation failed", forge.F("error", err))
        return err
    }
    
    // Publish event
    return eventPublisher.Publish("user.events", event)
}

Event Schema Validation

func validateEventSchema(eventType string, payload interface{}) error {
    switch eventType {
    case "user.created":
        return app.Router().ValidateMessage(UserCreatedEvent{}, payload)
    case "user.updated":
        return app.Router().ValidateMessage(UserUpdatedEvent{}, payload)
    case "order.created":
        return app.Router().ValidateMessage(OrderCreatedEvent{}, payload)
    default:
        return errors.New("unknown event type")
    }
}

Testing AsyncAPI

AsyncAPI Specification Testing

func TestAsyncAPISpec(t *testing.T) {
    app := forge.NewTestApp(forge.TestAppConfig{
        Name: "test-event-api",
    })
    
    // Register event channels
    app.Router().DocumentEventChannel("user.events", userEventChannel)
    app.Router().DocumentEventChannel("order.events", orderEventChannel)
    
    // Generate AsyncAPI spec
    spec := app.Router().GenerateAsyncAPISpec()
    
    // Validate spec
    assert.NotNil(t, spec)
    assert.Equal(t, "test-event-api", spec.Info.Title)
    assert.Contains(t, spec.Channels, "user.events")
    assert.Contains(t, spec.Channels, "order.events")
}

Documentation Endpoint Testing

func TestAsyncAPIEndpoint(t *testing.T) {
    app := forge.NewTestApp(forge.TestAppConfig{
        Name: "test-event-api",
    })
    
    // Start application
    go func() {
        app.Run()
    }()
    defer app.Stop(context.Background())
    
    // Test AsyncAPI endpoint
    resp, err := http.Get("http://localhost:8080/_/asyncapi.json")
    require.NoError(t, err)
    defer resp.Body.Close()
    
    assert.Equal(t, 200, resp.StatusCode)
    assert.Equal(t, "application/json", resp.Header.Get("Content-Type"))
    
    // Parse AsyncAPI spec
    var spec map[string]interface{}
    err = json.NewDecoder(resp.Body).Decode(&spec)
    require.NoError(t, err)
    
    assert.Equal(t, "test-event-api", spec["info"].(map[string]interface{})["title"])
}

Best Practices

  1. Event Design: Design clear, consistent event schemas
  2. Documentation: Provide comprehensive event descriptions
  3. Examples: Include realistic event examples
  4. Validation: Use schema validation for events
  5. Versioning: Use proper event schema versioning
  6. Testing: Test AsyncAPI generation and validation
  7. Standards: Follow AsyncAPI 2.0 standards
  8. Monitoring: Monitor event publishing and consumption

For more information about event-driven architecture, see the Events Extension documentation.

How is this guide?

Last updated on