gRPC Extension

High-performance gRPC server with TLS/mTLS, streaming, and full observability

gRPC Extension

The gRPC extension provides a production-ready gRPC server with comprehensive features including TLS/mTLS security, streaming support, health checks, service reflection, and full observability integration.

Features

Full gRPC Support

  • Unary RPCs: Simple request-response communication
  • Client Streaming: Client sends multiple messages, server responds once
  • Server Streaming: Server sends multiple messages for single client request
  • Bidirectional Streaming: Both client and server send multiple messages
  • Service Registration: Easy service registration with automatic discovery

Security & Authentication

  • TLS/mTLS Support: Secure communication with mutual authentication
  • Client Certificate Validation: Verify client identities with certificates
  • Certificate Management: Automatic certificate loading and validation
  • Secure Defaults: Production-ready security configurations

Performance & Reliability

  • Connection Management: Configurable keepalive and connection limits
  • Message Size Limits: Prevent memory exhaustion with size controls
  • Concurrent Stream Limits: Control resource usage with stream limits
  • Connection Timeout: Configurable connection timeout handling

Observability & Monitoring

  • Automatic Metrics: RPC counts, durations, success/failure rates
  • Structured Logging: Detailed request/response logging
  • Health Checks: Built-in gRPC health checking protocol
  • Server Statistics: Real-time server performance metrics
  • Custom Interceptors: Add middleware for cross-cutting concerns

Developer Experience

  • Service Reflection: Runtime service discovery and exploration
  • Health Monitoring: Standard health check protocol implementation
  • Error Handling: Structured error responses with proper status codes
  • Testing Support: Built-in testing utilities and helpers

Installation

Go Module

go get github.com/xraph/forge/extensions/grpc
go get google.golang.org/grpc@v1.76.0

Docker

FROM xraph/forge:latest
# gRPC extension is included

Package Manager

# Using Forge CLI
forge extension add grpc

# Using package manager
npm install @xraph/forge-grpc

Configuration

YAML Configuration

extensions:
  grpc:
    # Server settings
    address: ":50051"
    max_recv_msg_size: 4194304  # 4MB
    max_send_msg_size: 4194304  # 4MB
    max_concurrent_streams: 100
    connection_timeout: "120s"
    
    # Keepalive settings
    keepalive:
      time: "2h"                    # Send keepalive pings every 2 hours
      timeout: "20s"                # Wait 20s for keepalive response
      enforcement_policy: true      # Enforce keepalive policy
      min_time: "5m"               # Minimum time between pings
      permit_without_stream: false  # Don't allow pings without streams
    
    # TLS/mTLS configuration
    enable_tls: true
    tls_cert_file: "certs/server.crt"
    tls_key_file: "certs/server.key"
    tls_ca_file: "certs/ca.crt"
    client_auth: true  # Require client certificates
    
    # Features
    enable_health_check: true
    enable_reflection: true
    enable_metrics: true
    enable_tracing: true
    enable_logging: true
    
    # Advanced settings
    require_config: false

Environment Variables

# Server Configuration
FORGE_GRPC_ADDRESS=:50051
FORGE_GRPC_MAX_RECV_MSG_SIZE=4194304
FORGE_GRPC_MAX_SEND_MSG_SIZE=4194304
FORGE_GRPC_MAX_CONCURRENT_STREAMS=100
FORGE_GRPC_CONNECTION_TIMEOUT=120s

# Keepalive Configuration
FORGE_GRPC_KEEPALIVE_TIME=2h
FORGE_GRPC_KEEPALIVE_TIMEOUT=20s
FORGE_GRPC_KEEPALIVE_ENFORCEMENT_POLICY=true
FORGE_GRPC_KEEPALIVE_MIN_TIME=5m
FORGE_GRPC_KEEPALIVE_PERMIT_WITHOUT_STREAM=false

# TLS Configuration
FORGE_GRPC_ENABLE_TLS=true
FORGE_GRPC_TLS_CERT_FILE=certs/server.crt
FORGE_GRPC_TLS_KEY_FILE=certs/server.key
FORGE_GRPC_TLS_CA_FILE=certs/ca.crt
FORGE_GRPC_CLIENT_AUTH=true

# Features
FORGE_GRPC_ENABLE_HEALTH_CHECK=true
FORGE_GRPC_ENABLE_REFLECTION=true
FORGE_GRPC_ENABLE_METRICS=true
FORGE_GRPC_ENABLE_TRACING=true
FORGE_GRPC_ENABLE_LOGGING=true

Programmatic Configuration

package main

import (
    "time"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/grpc"
)

func main() {
    app := forge.NewApp(forge.AppConfig{
        Name:    "grpc-service",
        Version: "1.0.0",
    })

    // Configure gRPC extension
    config := grpc.Config{
        Address:              ":50051",
        MaxRecvMsgSize:       4 * 1024 * 1024, // 4MB
        MaxSendMsgSize:       4 * 1024 * 1024, // 4MB
        MaxConcurrentStreams: 100,
        ConnectionTimeout:    120 * time.Second,
        Keepalive: grpc.KeepaliveConfig{
            Time:                2 * time.Hour,
            Timeout:             20 * time.Second,
            EnforcementPolicy:   true,
            MinTime:             5 * time.Minute,
            PermitWithoutStream: false,
        },
        EnableTLS:         true,
        TLSCertFile:       "certs/server.crt",
        TLSKeyFile:        "certs/server.key",
        TLSCAFile:         "certs/ca.crt",
        ClientAuth:        true,
        EnableHealthCheck: true,
        EnableReflection:  true,
        EnableMetrics:     true,
        EnableTracing:     true,
        EnableLogging:     true,
    }

    // Register extension
    ext := grpc.NewExtensionWithConfig(config)
    app.RegisterExtension(ext)

    app.Start()
}

Usage Examples

Define Protocol Buffers

Create proto/user.proto:

syntax = "proto3";

package user.v1;

option go_package = "github.com/example/service/proto/user/v1;userv1";

// User service definition
service UserService {
  // Unary RPC
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
  
  // Server streaming RPC
  rpc ListUsers(ListUsersRequest) returns (stream User);
  
  // Client streaming RPC
  rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);
  
  // Bidirectional streaming RPC
  rpc ChatUsers(stream ChatMessage) returns (stream ChatMessage);
}

message User {
  string id = 1;
  string name = 2;
  string email = 3;
  int64 created_at = 4;
}

message GetUserRequest {
  string id = 1;
}

message GetUserResponse {
  User user = 1;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
}

message CreateUserResponse {
  User user = 1;
}

message ListUsersRequest {
  int32 limit = 1;
  string cursor = 2;
}

message CreateUsersResponse {
  repeated User users = 1;
  int32 created_count = 2;
}

message ChatMessage {
  string user_id = 1;
  string message = 2;
  int64 timestamp = 3;
}

Generate Go Code

# Install protoc and Go plugins
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# Generate Go code
protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       proto/user.proto

Implement Service

package main

import (
    "context"
    "fmt"
    "io"
    "sync"
    "time"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/grpc"
    "github.com/xraph/forge/extensions/database"
    userv1 "github.com/example/service/proto/user/v1"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

// UserService implements the gRPC user service
type UserService struct {
    userv1.UnimplementedUserServiceServer
    container forge.Container
    logger    forge.Logger
    metrics   forge.Metrics
}

// NewUserService creates a new user service
func NewUserService(container forge.Container) *UserService {
    return &UserService{
        container: container,
        logger:    forge.Must[forge.Logger](container, "logger"),
        metrics:   forge.Must[forge.Metrics](container, "metrics"),
    }
}

// GetUser implements unary RPC
func (s *UserService) GetUser(ctx context.Context, req *userv1.GetUserRequest) (*userv1.GetUserResponse, error) {
    s.logger.Info("getting user", forge.F("user_id", req.Id))
    
    if req.Id == "" {
        return nil, status.Error(codes.InvalidArgument, "user ID is required")
    }
    
    // Get database from DI container
    db := forge.Must[database.Database](s.container, "database")
    
    var user userv1.User
    err := db.QueryRow(ctx, 
        "SELECT id, name, email, created_at FROM users WHERE id = $1", 
        req.Id).Scan(&user.Id, &user.Name, &user.Email, &user.CreatedAt)
    
    if err != nil {
        if err == database.ErrNoRows {
            return nil, status.Error(codes.NotFound, "user not found")
        }
        s.logger.Error("failed to get user", forge.F("error", err))
        return nil, status.Error(codes.Internal, "internal error")
    }
    
    s.metrics.Counter("user_get_total").Inc()
    
    return &userv1.GetUserResponse{
        User: &user,
    }, nil
}

// CreateUser implements unary RPC
func (s *UserService) CreateUser(ctx context.Context, req *userv1.CreateUserRequest) (*userv1.CreateUserResponse, error) {
    s.logger.Info("creating user", forge.F("name", req.Name), forge.F("email", req.Email))
    
    if req.Name == "" || req.Email == "" {
        return nil, status.Error(codes.InvalidArgument, "name and email are required")
    }
    
    db := forge.Must[database.Database](s.container, "database")
    
    var user userv1.User
    err := db.QueryRow(ctx,
        "INSERT INTO users (name, email, created_at) VALUES ($1, $2, $3) RETURNING id, name, email, created_at",
        req.Name, req.Email, time.Now().Unix()).
        Scan(&user.Id, &user.Name, &user.Email, &user.CreatedAt)
    
    if err != nil {
        s.logger.Error("failed to create user", forge.F("error", err))
        return nil, status.Error(codes.Internal, "failed to create user")
    }
    
    s.metrics.Counter("user_create_total").Inc()
    
    return &userv1.CreateUserResponse{
        User: &user,
    }, nil
}

// ListUsers implements server streaming RPC
func (s *UserService) ListUsers(req *userv1.ListUsersRequest, stream userv1.UserService_ListUsersServer) error {
    s.logger.Info("listing users", forge.F("limit", req.Limit), forge.F("cursor", req.Cursor))
    
    db := forge.Must[database.Database](s.container, "database")
    
    limit := req.Limit
    if limit <= 0 || limit > 100 {
        limit = 10 // Default limit
    }
    
    query := "SELECT id, name, email, created_at FROM users ORDER BY created_at DESC LIMIT $1"
    args := []interface{}{limit}
    
    if req.Cursor != "" {
        query = "SELECT id, name, email, created_at FROM users WHERE created_at < $2 ORDER BY created_at DESC LIMIT $1"
        args = append(args, req.Cursor)
    }
    
    rows, err := db.Query(stream.Context(), query, args...)
    if err != nil {
        s.logger.Error("failed to query users", forge.F("error", err))
        return status.Error(codes.Internal, "failed to query users")
    }
    defer rows.Close()
    
    count := 0
    for rows.Next() {
        var user userv1.User
        if err := rows.Scan(&user.Id, &user.Name, &user.Email, &user.CreatedAt); err != nil {
            s.logger.Error("failed to scan user", forge.F("error", err))
            continue
        }
        
        if err := stream.Send(&user); err != nil {
            s.logger.Error("failed to send user", forge.F("error", err))
            return err
        }
        
        count++
    }
    
    s.metrics.Counter("user_list_total").Inc()
    s.metrics.Histogram("user_list_count").Observe(float64(count))
    
    return nil
}

// CreateUsers implements client streaming RPC
func (s *UserService) CreateUsers(stream userv1.UserService_CreateUsersServer) error {
    s.logger.Info("creating users via stream")
    
    db := forge.Must[database.Database](s.container, "database")
    
    var users []*userv1.User
    var createdCount int32
    
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            // Client finished sending
            break
        }
        if err != nil {
            s.logger.Error("failed to receive user", forge.F("error", err))
            return status.Error(codes.Internal, "failed to receive user")
        }
        
        if req.Name == "" || req.Email == "" {
            s.logger.Warn("skipping invalid user", forge.F("name", req.Name), forge.F("email", req.Email))
            continue
        }
        
        var user userv1.User
        err = db.QueryRow(stream.Context(),
            "INSERT INTO users (name, email, created_at) VALUES ($1, $2, $3) RETURNING id, name, email, created_at",
            req.Name, req.Email, time.Now().Unix()).
            Scan(&user.Id, &user.Name, &user.Email, &user.CreatedAt)
        
        if err != nil {
            s.logger.Error("failed to create user", forge.F("error", err))
            continue
        }
        
        users = append(users, &user)
        createdCount++
    }
    
    s.metrics.Counter("user_create_batch_total").Inc()
    s.metrics.Histogram("user_create_batch_count").Observe(float64(createdCount))
    
    return stream.SendAndClose(&userv1.CreateUsersResponse{
        Users:        users,
        CreatedCount: createdCount,
    })
}

// ChatUsers implements bidirectional streaming RPC
func (s *UserService) ChatUsers(stream userv1.UserService_ChatUsersServer) error {
    s.logger.Info("starting chat stream")
    
    // In a real implementation, you'd integrate with a message broker
    // For this example, we'll echo messages back
    
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            s.logger.Error("failed to receive chat message", forge.F("error", err))
            return err
        }
        
        s.logger.Debug("received chat message", 
            forge.F("user_id", msg.UserId), 
            forge.F("message", msg.Message))
        
        // Echo the message back (in real app, broadcast to other users)
        response := &userv1.ChatMessage{
            UserId:    "system",
            Message:   fmt.Sprintf("Echo: %s", msg.Message),
            Timestamp: time.Now().Unix(),
        }
        
        if err := stream.Send(response); err != nil {
            s.logger.Error("failed to send chat message", forge.F("error", err))
            return err
        }
        
        s.metrics.Counter("chat_message_total").Inc()
    }
}

Register Service

package main

import (
    "context"
    "log"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/grpc"
    userv1 "github.com/example/service/proto/user/v1"
)

func main() {
    // Create Forge app
    app := forge.NewApp(forge.AppConfig{
        Name:    "user-service",
        Version: "1.0.0",
    })

    // Register gRPC extension
    app.RegisterExtension(grpc.NewExtension(
        grpc.WithAddress(":50051"),
        grpc.WithTLS("certs/server.crt", "certs/server.key", "certs/ca.crt"),
        grpc.WithClientAuth(true),
        grpc.WithHealthCheck(true),
        grpc.WithReflection(true),
        grpc.WithMetrics(true),
    ))

    // Start the app
    if err := app.Start(context.Background()); err != nil {
        log.Fatal("Failed to start app:", err)
    }

    // Get gRPC server from DI container
    grpcServer := forge.Must[grpc.GRPC](app.Container(), "grpc")

    // Create and register user service
    userService := NewUserService(app.Container())
    userv1.RegisterUserServiceServer(grpcServer.GetServer(), userService)

    // Register custom health checker
    grpcServer.RegisterHealthChecker("user-service", &UserHealthChecker{
        container: app.Container(),
    })

    // Run the app
    if err := app.Run(context.Background(), ":8080"); err != nil {
        log.Fatal("App failed:", err)
    }
}

// UserHealthChecker implements custom health checking
type UserHealthChecker struct {
    container forge.Container
}

func (h *UserHealthChecker) Check(ctx context.Context) error {
    // Check database connectivity
    db := forge.Must[database.Database](h.container, "database")
    return db.Ping(ctx)
}

Advanced Features

Custom Interceptors

package interceptors

import (
    "context"
    "strings"
    "time"
    
    "github.com/xraph/forge"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/status"
)

// AuthInterceptor validates authentication tokens
func AuthInterceptor(logger forge.Logger) grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        // Skip auth for health checks
        if strings.HasSuffix(info.FullMethod, "/Health/Check") {
            return handler(ctx, req)
        }
        
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
            return nil, status.Error(codes.Unauthenticated, "missing metadata")
        }
        
        tokens := md.Get("authorization")
        if len(tokens) == 0 {
            return nil, status.Error(codes.Unauthenticated, "missing authorization token")
        }
        
        token := strings.TrimPrefix(tokens[0], "Bearer ")
        
        // Validate token (implement your validation logic)
        userID, err := validateToken(token)
        if err != nil {
            logger.Warn("invalid token", forge.F("error", err))
            return nil, status.Error(codes.Unauthenticated, "invalid token")
        }
        
        // Add user ID to context
        ctx = context.WithValue(ctx, "user_id", userID)
        
        return handler(ctx, req)
    }
}

// RateLimitInterceptor implements rate limiting
func RateLimitInterceptor(limiter RateLimiter) grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        // Get client identifier (IP, user ID, etc.)
        clientID := getClientID(ctx)
        
        if !limiter.Allow(clientID, info.FullMethod) {
            return nil, status.Error(codes.ResourceExhausted, "rate limit exceeded")
        }
        
        return handler(ctx, req)
    }
}

// TimeoutInterceptor adds per-method timeouts
func TimeoutInterceptor(timeouts map[string]time.Duration) grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        if timeout, ok := timeouts[info.FullMethod]; ok {
            var cancel context.CancelFunc
            ctx, cancel = context.WithTimeout(ctx, timeout)
            defer cancel()
        }
        
        return handler(ctx, req)
    }
}

// Register custom interceptors
func RegisterInterceptors(grpcServer grpc.GRPC, logger forge.Logger) {
    // Add authentication interceptor
    grpcServer.AddUnaryInterceptor(AuthInterceptor(logger))
    
    // Add rate limiting interceptor
    limiter := NewRateLimiter(100, time.Minute) // 100 requests per minute
    grpcServer.AddUnaryInterceptor(RateLimitInterceptor(limiter))
    
    // Add timeout interceptor
    timeouts := map[string]time.Duration{
        "/user.v1.UserService/GetUser":    5 * time.Second,
        "/user.v1.UserService/CreateUser": 10 * time.Second,
        "/user.v1.UserService/ListUsers":  30 * time.Second,
    }
    grpcServer.AddUnaryInterceptor(TimeoutInterceptor(timeouts))
}

TLS/mTLS Setup

# Generate CA private key
openssl genrsa -out ca.key 4096

# Generate CA certificate
openssl req -new -x509 -key ca.key -sha256 -subj "/C=US/ST=CA/O=MyOrg/CN=MyCA" -days 3650 -out ca.crt

# Generate server private key
openssl genrsa -out server.key 4096

# Generate server certificate signing request
openssl req -new -key server.key -out server.csr -config <(
cat <<EOF
[req]
default_bits = 4096
prompt = no
distinguished_name = req_distinguished_name
req_extensions = req_ext

[req_distinguished_name]
C = US
ST = CA
O = MyOrg
CN = localhost

[req_ext]
subjectAltName = @alt_names

[alt_names]
DNS.1 = localhost
DNS.2 = *.localhost
IP.1 = 127.0.0.1
IP.2 = ::1
EOF
)

# Generate server certificate signed by CA
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 365 -sha256 -extensions req_ext -extfile <(
cat <<EOF
[req_ext]
subjectAltName = @alt_names

[alt_names]
DNS.1 = localhost
DNS.2 = *.localhost
IP.1 = 127.0.0.1
IP.2 = ::1
EOF
)

# Generate client private key (for mTLS)
openssl genrsa -out client.key 4096

# Generate client certificate signing request
openssl req -new -key client.key -subj "/C=US/ST=CA/O=MyOrg/CN=client" -out client.csr

# Generate client certificate signed by CA
openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client.crt -days 365 -sha256

Client Implementation

package main

import (
    "context"
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "io"
    "log"
    "os"
    "time"
    
    userv1 "github.com/example/service/proto/user/v1"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/metadata"
)

func main() {
    // Load client certificates for mTLS
    clientCert, err := tls.LoadX509KeyPair("certs/client.crt", "certs/client.key")
    if err != nil {
        log.Fatal("Failed to load client cert:", err)
    }
    
    // Load CA certificate
    caCert, err := os.ReadFile("certs/ca.crt")
    if err != nil {
        log.Fatal("Failed to read CA cert:", err)
    }
    
    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)
    
    // Create TLS credentials
    creds := credentials.NewTLS(&tls.Config{
        Certificates: []tls.Certificate{clientCert},
        RootCAs:      caCertPool,
        ServerName:   "localhost",
    })
    
    // Connect to server
    conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(creds))
    if err != nil {
        log.Fatal("Failed to connect:", err)
    }
    defer conn.Close()
    
    // Create client
    client := userv1.NewUserServiceClient(conn)
    
    // Add authentication metadata
    ctx := metadata.AppendToOutgoingContext(context.Background(), 
        "authorization", "Bearer your-jwt-token")
    
    // Test unary RPC
    testUnaryRPC(ctx, client)
    
    // Test server streaming RPC
    testServerStreamingRPC(ctx, client)
    
    // Test client streaming RPC
    testClientStreamingRPC(ctx, client)
    
    // Test bidirectional streaming RPC
    testBidirectionalStreamingRPC(ctx, client)
}

func testUnaryRPC(ctx context.Context, client userv1.UserServiceClient) {
    fmt.Println("=== Testing Unary RPC ===")
    
    // Create user
    createResp, err := client.CreateUser(ctx, &userv1.CreateUserRequest{
        Name:  "John Doe",
        Email: "john@example.com",
    })
    if err != nil {
        log.Printf("Failed to create user: %v", err)
        return
    }
    
    fmt.Printf("Created user: %+v\n", createResp.User)
    
    // Get user
    getResp, err := client.GetUser(ctx, &userv1.GetUserRequest{
        Id: createResp.User.Id,
    })
    if err != nil {
        log.Printf("Failed to get user: %v", err)
        return
    }
    
    fmt.Printf("Retrieved user: %+v\n", getResp.User)
}

func testServerStreamingRPC(ctx context.Context, client userv1.UserServiceClient) {
    fmt.Println("=== Testing Server Streaming RPC ===")
    
    stream, err := client.ListUsers(ctx, &userv1.ListUsersRequest{
        Limit: 10,
    })
    if err != nil {
        log.Printf("Failed to list users: %v", err)
        return
    }
    
    for {
        user, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("Failed to receive user: %v", err)
            break
        }
        
        fmt.Printf("Received user: %+v\n", user)
    }
}

func testClientStreamingRPC(ctx context.Context, client userv1.UserServiceClient) {
    fmt.Println("=== Testing Client Streaming RPC ===")
    
    stream, err := client.CreateUsers(ctx)
    if err != nil {
        log.Printf("Failed to create users stream: %v", err)
        return
    }
    
    // Send multiple users
    users := []*userv1.CreateUserRequest{
        {Name: "Alice", Email: "alice@example.com"},
        {Name: "Bob", Email: "bob@example.com"},
        {Name: "Charlie", Email: "charlie@example.com"},
    }
    
    for _, user := range users {
        if err := stream.Send(user); err != nil {
            log.Printf("Failed to send user: %v", err)
            return
        }
    }
    
    resp, err := stream.CloseAndRecv()
    if err != nil {
        log.Printf("Failed to close and receive: %v", err)
        return
    }
    
    fmt.Printf("Created %d users\n", resp.CreatedCount)
}

func testBidirectionalStreamingRPC(ctx context.Context, client userv1.UserServiceClient) {
    fmt.Println("=== Testing Bidirectional Streaming RPC ===")
    
    stream, err := client.ChatUsers(ctx)
    if err != nil {
        log.Printf("Failed to create chat stream: %v", err)
        return
    }
    
    // Start receiving messages
    go func() {
        for {
            msg, err := stream.Recv()
            if err == io.EOF {
                return
            }
            if err != nil {
                log.Printf("Failed to receive chat message: %v", err)
                return
            }
            
            fmt.Printf("Received: %s from %s\n", msg.Message, msg.UserId)
        }
    }()
    
    // Send messages
    messages := []string{"Hello", "How are you?", "Goodbye"}
    for _, msg := range messages {
        if err := stream.Send(&userv1.ChatMessage{
            UserId:    "client-user",
            Message:   msg,
            Timestamp: time.Now().Unix(),
        }); err != nil {
            log.Printf("Failed to send message: %v", err)
            return
        }
        
        time.Sleep(1 * time.Second)
    }
    
    stream.CloseSend()
    time.Sleep(2 * time.Second) // Wait for responses
}

Best Practices

Service Design

  • Use semantic versioning: Version your proto files and services
  • Design for evolution: Use optional fields and avoid breaking changes
  • Keep messages small: Large messages impact performance
  • Use streaming appropriately: Choose the right RPC type for your use case

Performance Optimization

  • Configure message sizes: Set appropriate limits for your use case
  • Use connection pooling: Reuse connections in client applications
  • Implement proper timeouts: Set context deadlines for all operations
  • Monitor resource usage: Track memory and CPU usage

Security Considerations

  • Always use TLS in production: Encrypt all gRPC traffic
  • Implement authentication: Validate client identities
  • Use authorization: Control access to specific methods
  • Validate input data: Sanitize and validate all request data

Error Handling

  • Use appropriate status codes: Return meaningful gRPC status codes
  • Provide error details: Include helpful error messages
  • Implement retries: Handle transient failures gracefully
  • Log errors properly: Include context for debugging

Troubleshooting

Common Issues

Connection Refused

# Check if server is running
netstat -tlnp | grep :50051

# Test connectivity
grpcurl -plaintext localhost:50051 list

TLS Certificate Errors

# Verify certificate
openssl x509 -in server.crt -text -noout

# Check certificate chain
openssl verify -CAfile ca.crt server.crt

# Test TLS connection
openssl s_client -connect localhost:50051 -cert client.crt -key client.key

Service Registration Issues

// Ensure service is registered after server start
app.Start(ctx)
grpcServer := forge.Must[grpc.GRPC](app.Container(), "grpc")
userv1.RegisterUserServiceServer(grpcServer.GetServer(), userService)

Health Check Failures

// Register custom health checker
grpcServer.RegisterHealthChecker("my-service", &MyHealthChecker{})

// Test health check
grpcurl -plaintext localhost:50051 grpc.health.v1.Health/Check

Debugging

Enable Debug Logging

logging:
  level: debug
  loggers:
    grpc: debug
    grpc.server: debug
    grpc.interceptor: debug

Monitor Metrics

// Check server statistics
stats := grpcServer.GetStats()
fmt.Printf("RPCs Started: %d\n", stats.RPCsStarted)
fmt.Printf("RPCs Succeeded: %d\n", stats.RPCsSucceeded)
fmt.Printf("RPCs Failed: %d\n", stats.RPCsFailed)
fmt.Printf("Active Streams: %d\n", stats.ActiveStreams)

Use gRPC Reflection

# List services
grpcurl -plaintext localhost:50051 list

# Describe service
grpcurl -plaintext localhost:50051 describe user.v1.UserService

# Call method
grpcurl -plaintext -d '{"name":"John","email":"john@example.com"}' \
  localhost:50051 user.v1.UserService/CreateUser

Next Steps

How is this guide?

Last updated on