AsyncAPI
Generate AsyncAPI documentation for event-driven APIs and messaging systems
Forge provides comprehensive AsyncAPI support for documenting event-driven APIs, message queues, and real-time communication systems. The framework automatically generates AsyncAPI specifications from your event definitions and provides built-in endpoints for API documentation.
AsyncAPI Interface
Forge's AsyncAPI interface provides automatic documentation generation for event-driven systems:
type AsyncAPIManager interface {
// Specification generation
GenerateSpec() (*asyncapi.Spec, error)
AddChannel(name string, channel Channel) error
// Message documentation
DocumentMessage(channel string, message Message) error
DocumentOperation(operation Operation) error
// Customization
SetInfo(info AsyncAPIInfo) error
SetServers(servers []AsyncAPIServer) error
// Validation
ValidateMessage(schema interface{}, data interface{}) error
}Event-Driven API Documentation
Basic AsyncAPI Setup
func main() {
app := forge.NewApp(forge.AppConfig{
Name: "event-driven-api",
Version: "1.0.0",
})
// Enable AsyncAPI documentation
app.Router().Use(forge.AsyncAPIMiddleware(forge.AsyncAPIConfig{
Title: "Event-Driven API",
Version: "1.0.0",
Description: "A comprehensive event-driven API for real-time communication",
Contact: forge.AsyncAPIContact{
Name: "API Support",
Email: "support@example.com",
},
License: forge.AsyncAPILicense{
Name: "MIT",
URL: "https://opensource.org/licenses/MIT",
},
}))
// Document event channels
app.Router().DocumentEventChannel("user.events", userEventChannel)
app.Router().DocumentEventChannel("order.events", orderEventChannel)
app.Run()
}Event Channel Documentation
// User events channel
var userEventChannel = forge.EventChannel{
Description: "User-related events",
Messages: map[string]forge.EventMessage{
"user.created": {
Summary: "User created event",
Description: "Published when a new user is created",
Payload: UserCreatedEvent{},
Tags: []string{"user", "creation"},
},
"user.updated": {
Summary: "User updated event",
Description: "Published when a user is updated",
Payload: UserUpdatedEvent{},
Tags: []string{"user", "update"},
},
"user.deleted": {
Summary: "User deleted event",
Description: "Published when a user is deleted",
Payload: UserDeletedEvent{},
Tags: []string{"user", "deletion"},
},
},
}
// Order events channel
var orderEventChannel = forge.EventChannel{
Description: "Order-related events",
Messages: map[string]forge.EventMessage{
"order.created": {
Summary: "Order created event",
Description: "Published when a new order is created",
Payload: OrderCreatedEvent{},
Tags: []string{"order", "creation"},
},
"order.updated": {
Summary: "Order updated event",
Description: "Published when an order is updated",
Payload: OrderUpdatedEvent{},
Tags: []string{"order", "update"},
},
"order.completed": {
Summary: "Order completed event",
Description: "Published when an order is completed",
Payload: OrderCompletedEvent{},
Tags: []string{"order", "completion"},
},
},
}Event Schema Definitions
Event Payload Schemas
// User event schemas
type UserCreatedEvent struct {
UserID uuid.UUID `json:"user_id" description:"Unique user identifier" format:"uuid"`
Email string `json:"email" description:"User email address" format:"email"`
Name string `json:"name" description:"User full name"`
CreatedAt time.Time `json:"created_at" description:"User creation timestamp" format:"date-time"`
Metadata EventMetadata `json:"metadata" description:"Event metadata"`
}
type UserUpdatedEvent struct {
UserID uuid.UUID `json:"user_id" description:"Unique user identifier" format:"uuid"`
Email string `json:"email" description:"User email address" format:"email"`
Name string `json:"name" description:"User full name"`
UpdatedAt time.Time `json:"updated_at" description:"User update timestamp" format:"date-time"`
Changes []string `json:"changes" description:"List of changed fields"`
Metadata EventMetadata `json:"metadata" description:"Event metadata"`
}
type UserDeletedEvent struct {
UserID uuid.UUID `json:"user_id" description:"Unique user identifier" format:"uuid"`
DeletedAt time.Time `json:"deleted_at" description:"User deletion timestamp" format:"date-time"`
Reason string `json:"reason" description:"Reason for deletion"`
Metadata EventMetadata `json:"metadata" description:"Event metadata"`
}
// Order event schemas
type OrderCreatedEvent struct {
OrderID uuid.UUID `json:"order_id" description:"Unique order identifier" format:"uuid"`
UserID uuid.UUID `json:"user_id" description:"User identifier" format:"uuid"`
Items []OrderItem `json:"items" description:"Order items"`
Total float64 `json:"total" description:"Order total amount"`
CreatedAt time.Time `json:"created_at" description:"Order creation timestamp" format:"date-time"`
Metadata EventMetadata `json:"metadata" description:"Event metadata"`
}
type OrderUpdatedEvent struct {
OrderID uuid.UUID `json:"order_id" description:"Unique order identifier" format:"uuid"`
Status string `json:"status" description:"Order status"`
UpdatedAt time.Time `json:"updated_at" description:"Order update timestamp" format:"date-time"`
Changes []string `json:"changes" description:"List of changed fields"`
Metadata EventMetadata `json:"metadata" description:"Event metadata"`
}
type OrderCompletedEvent struct {
OrderID uuid.UUID `json:"order_id" description:"Unique order identifier" format:"uuid"`
CompletedAt time.Time `json:"completed_at" description:"Order completion timestamp" format:"date-time"`
FinalTotal float64 `json:"final_total" description:"Final order total"`
Metadata EventMetadata `json:"metadata" description:"Event metadata"`
}
// Common schemas
type EventMetadata struct {
EventID uuid.UUID `json:"event_id" description:"Unique event identifier" format:"uuid"`
Timestamp time.Time `json:"timestamp" description:"Event timestamp" format:"date-time"`
Version string `json:"version" description:"Event schema version"`
Source string `json:"source" description:"Event source service"`
CorrelationID string `json:"correlation_id" description:"Correlation ID for tracing"`
}
type OrderItem struct {
ProductID uuid.UUID `json:"product_id" description:"Product identifier" format:"uuid"`
Name string `json:"name" description:"Product name"`
Quantity int `json:"quantity" description:"Item quantity" minimum:"1"`
Price float64 `json:"price" description:"Item price" minimum:"0"`
}Message Queue Integration
Kafka Integration
// Kafka event channel
var kafkaEventChannel = forge.EventChannel{
Description: "Kafka-based event channel",
Servers: []forge.AsyncAPIServer{
{
URL: "kafka://localhost:9092",
Protocol: "kafka",
Description: "Local Kafka broker",
},
},
Messages: map[string]forge.EventMessage{
"user.events": {
Summary: "User events",
Description: "User-related events published to Kafka",
Payload: UserEvent{},
Tags: []string{"user", "kafka"},
},
},
}
// Kafka message producer
type KafkaProducer struct {
producer *kafka.Producer
}
func (kp *KafkaProducer) PublishUserEvent(event UserEvent) error {
message := forge.EventMessage{
Channel: "user.events",
Payload: event,
Headers: map[string]interface{}{
"event_type": "user.created",
"timestamp": time.Now().Unix(),
},
}
return kp.producer.Produce(message)
}NATS Integration
// NATS event channel
var natsEventChannel = forge.EventChannel{
Description: "NATS-based event channel",
Servers: []forge.AsyncAPIServer{
{
URL: "nats://localhost:4222",
Protocol: "nats",
Description: "Local NATS server",
},
},
Messages: map[string]forge.EventMessage{
"order.events": {
Summary: "Order events",
Description: "Order-related events published to NATS",
Payload: OrderEvent{},
Tags: []string{"order", "nats"},
},
},
}
// NATS message producer
type NATSProducer struct {
conn *nats.Conn
}
func (np *NATSProducer) PublishOrderEvent(event OrderEvent) error {
message := forge.EventMessage{
Channel: "order.events",
Payload: event,
Headers: map[string]interface{}{
"event_type": "order.created",
"timestamp": time.Now().Unix(),
},
}
return np.conn.Publish("order.events", message)
}WebSocket Event Documentation
WebSocket Event Channel
// WebSocket event channel
var websocketEventChannel = forge.EventChannel{
Description: "WebSocket-based real-time events",
Servers: []forge.AsyncAPIServer{
{
URL: "ws://localhost:8080/ws",
Protocol: "ws",
Description: "WebSocket server",
},
},
Messages: map[string]forge.EventMessage{
"chat.message": {
Summary: "Chat message",
Description: "Real-time chat message",
Payload: ChatMessage{},
Tags: []string{"chat", "websocket"},
},
"user.typing": {
Summary: "User typing indicator",
Description: "Indicates when a user is typing",
Payload: TypingIndicator{},
Tags: []string{"chat", "typing"},
},
"user.online": {
Summary: "User online status",
Description: "User online/offline status",
Payload: UserStatus{},
Tags: []string{"user", "status"},
},
},
}
// WebSocket event schemas
type ChatMessage struct {
MessageID uuid.UUID `json:"message_id" description:"Unique message identifier" format:"uuid"`
UserID uuid.UUID `json:"user_id" description:"User identifier" format:"uuid"`
Username string `json:"username" description:"Username"`
Content string `json:"content" description:"Message content"`
Timestamp time.Time `json:"timestamp" description:"Message timestamp" format:"date-time"`
RoomID string `json:"room_id" description:"Chat room identifier"`
}
type TypingIndicator struct {
UserID uuid.UUID `json:"user_id" description:"User identifier" format:"uuid"`
Username string `json:"username" description:"Username"`
RoomID string `json:"room_id" description:"Chat room identifier"`
IsTyping bool `json:"is_typing" description:"Whether user is typing"`
}
type UserStatus struct {
UserID uuid.UUID `json:"user_id" description:"User identifier" format:"uuid"`
Username string `json:"username" description:"Username"`
Status string `json:"status" description:"User status" enum:"online,offline,away"`
Timestamp time.Time `json:"timestamp" description:"Status timestamp" format:"date-time"`
}Server-Sent Events Documentation
SSE Event Channel
// SSE event channel
var sseEventChannel = forge.EventChannel{
Description: "Server-Sent Events for real-time updates",
Servers: []forge.AsyncAPIServer{
{
URL: "http://localhost:8080/events",
Protocol: "sse",
Description: "Server-Sent Events endpoint",
},
},
Messages: map[string]forge.EventMessage{
"notification": {
Summary: "Notification event",
Description: "Real-time notification",
Payload: Notification{},
Tags: []string{"notification", "sse"},
},
"system.alert": {
Summary: "System alert",
Description: "System alert notification",
Payload: SystemAlert{},
Tags: []string{"system", "alert"},
},
},
}
// SSE event schemas
type Notification struct {
NotificationID uuid.UUID `json:"notification_id" description:"Unique notification identifier" format:"uuid"`
UserID uuid.UUID `json:"user_id" description:"User identifier" format:"uuid"`
Title string `json:"title" description:"Notification title"`
Message string `json:"message" description:"Notification message"`
Type string `json:"type" description:"Notification type" enum:"info,warning,error,success"`
Timestamp time.Time `json:"timestamp" description:"Notification timestamp" format:"date-time"`
Read bool `json:"read" description:"Whether notification is read"`
}
type SystemAlert struct {
AlertID uuid.UUID `json:"alert_id" description:"Unique alert identifier" format:"uuid"`
Level string `json:"level" description:"Alert level" enum:"low,medium,high,critical"`
Message string `json:"message" description:"Alert message"`
Source string `json:"source" description:"Alert source"`
Timestamp time.Time `json:"timestamp" description:"Alert timestamp" format:"date-time"`
Resolved bool `json:"resolved" description:"Whether alert is resolved"`
}AsyncAPI Configuration
Basic Configuration
app := forge.NewApp(forge.AppConfig{
AsyncAPIConfig: forge.AsyncAPIConfig{
Enabled: true,
Title: "Event-Driven API",
Version: "1.0.0",
Description: "A comprehensive event-driven API for real-time communication",
Contact: forge.AsyncAPIContact{
Name: "API Support",
Email: "support@example.com",
URL: "https://example.com/support",
},
License: forge.AsyncAPILicense{
Name: "MIT",
URL: "https://opensource.org/licenses/MIT",
},
Servers: []forge.AsyncAPIServer{
{
URL: "kafka://localhost:9092",
Protocol: "kafka",
Description: "Kafka message broker",
},
{
URL: "ws://localhost:8080/ws",
Protocol: "ws",
Description: "WebSocket server",
},
},
},
})Advanced Configuration
asyncAPIConfig := forge.AsyncAPIConfig{
// Basic info
Enabled: true,
Title: "Event-Driven API",
Version: "1.0.0",
Description: "A comprehensive event-driven API for real-time communication",
// Contact information
Contact: forge.AsyncAPIContact{
Name: "API Support",
Email: "support@example.com",
URL: "https://example.com/support",
},
// License information
License: forge.AsyncAPILicense{
Name: "MIT",
URL: "https://opensource.org/licenses/MIT",
},
// Servers
Servers: []forge.AsyncAPIServer{
{
URL: "kafka://localhost:9092",
Protocol: "kafka",
Description: "Kafka message broker",
},
{
URL: "nats://localhost:4222",
Protocol: "nats",
Description: "NATS message broker",
},
{
URL: "ws://localhost:8080/ws",
Protocol: "ws",
Description: "WebSocket server",
},
},
// Custom settings
CustomSettings: map[string]interface{}{
"enable_validation": true,
"strict_mode": false,
"generate_examples": true,
},
}AsyncAPI Endpoints
Built-in Endpoints
Forge automatically provides AsyncAPI endpoints:
# AsyncAPI specification
GET /_/asyncapi.json
# AsyncAPI Studio
GET /_/asyncapi-studio
# AsyncAPI YAML
GET /_/asyncapi.yamlCustom Documentation Endpoints
// Custom AsyncAPI endpoint
app.Router().GET("/docs/asyncapi", func(ctx forge.Context) error {
spec := app.Router().GenerateAsyncAPISpec()
// Customize spec
spec.Info.Title = "My Custom Event API"
spec.Info.Version = "2.0.0"
return ctx.JSON(200, spec)
})
// Custom AsyncAPI Studio
app.Router().GET("/docs/asyncapi-studio", func(ctx forge.Context) error {
html := generateAsyncAPIStudio("/docs/asyncapi")
return ctx.HTML(200, html)
})Event Validation
Message Validation
func publishUserEvent(event UserEvent) error {
// Validate event against AsyncAPI schema
if err := app.Router().ValidateMessage(UserEvent{}, event); err != nil {
app.Logger().Error("event validation failed", forge.F("error", err))
return err
}
// Publish event
return eventPublisher.Publish("user.events", event)
}Event Schema Validation
func validateEventSchema(eventType string, payload interface{}) error {
switch eventType {
case "user.created":
return app.Router().ValidateMessage(UserCreatedEvent{}, payload)
case "user.updated":
return app.Router().ValidateMessage(UserUpdatedEvent{}, payload)
case "order.created":
return app.Router().ValidateMessage(OrderCreatedEvent{}, payload)
default:
return errors.New("unknown event type")
}
}Testing AsyncAPI
AsyncAPI Specification Testing
func TestAsyncAPISpec(t *testing.T) {
app := forge.NewTestApp(forge.TestAppConfig{
Name: "test-event-api",
})
// Register event channels
app.Router().DocumentEventChannel("user.events", userEventChannel)
app.Router().DocumentEventChannel("order.events", orderEventChannel)
// Generate AsyncAPI spec
spec := app.Router().GenerateAsyncAPISpec()
// Validate spec
assert.NotNil(t, spec)
assert.Equal(t, "test-event-api", spec.Info.Title)
assert.Contains(t, spec.Channels, "user.events")
assert.Contains(t, spec.Channels, "order.events")
}Documentation Endpoint Testing
func TestAsyncAPIEndpoint(t *testing.T) {
app := forge.NewTestApp(forge.TestAppConfig{
Name: "test-event-api",
})
// Start application
go func() {
app.Run()
}()
defer app.Stop(context.Background())
// Test AsyncAPI endpoint
resp, err := http.Get("http://localhost:8080/_/asyncapi.json")
require.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, 200, resp.StatusCode)
assert.Equal(t, "application/json", resp.Header.Get("Content-Type"))
// Parse AsyncAPI spec
var spec map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&spec)
require.NoError(t, err)
assert.Equal(t, "test-event-api", spec["info"].(map[string]interface{})["title"])
}Best Practices
- Event Design: Design clear, consistent event schemas
- Documentation: Provide comprehensive event descriptions
- Examples: Include realistic event examples
- Validation: Use schema validation for events
- Versioning: Use proper event schema versioning
- Testing: Test AsyncAPI generation and validation
- Standards: Follow AsyncAPI 2.0 standards
- Monitoring: Monitor event publishing and consumption
For more information about event-driven architecture, see the Events Extension documentation.
How is this guide?
Last updated on