AI SDK

Caching & Batching

Optimize costs with semantic caching and batch processing

Caching & Batching

Reduce costs and improve performance with semantic caching and automatic batching.

Semantic Caching

Vector-based similarity matching for intelligent cache hits:

cache := sdk.NewSemanticCache(
    vectorStore,
    cacheStore,
    logger,
    metrics,
    sdk.SemanticCacheConfig{
        SimilarityThreshold: 0.95,  // 95% similarity for cache hit
        TTL: 1 * time.Hour,
    },
)

// First request - cache miss
result1, _ := sdk.NewGenerateBuilder(ctx, llm, logger, metrics).
    WithPrompt("What is Go?").
    WithCache(cache).
    Execute()

// Similar request - cache HIT!
result2, _ := sdk.NewGenerateBuilder(ctx, llm, logger, metrics).
    WithPrompt("Tell me about the Go programming language").
    WithCache(cache).  // Returns cached result!
    Execute()

How It Works

  1. Query embedding: Convert prompt to vector
  2. Similarity search: Find similar cached queries
  3. Threshold check: If similarity ≥ threshold, return cached response
  4. Cache miss: Execute request and cache result

Configuration

cache := sdk.NewSemanticCache(vectorStore, cacheStore, logger, metrics,
    sdk.SemanticCacheConfig{
        SimilarityThreshold: 0.95,    // Strict: exact matches only
        TTL:                 1 * time.Hour,
        MaxEntries:          10000,
        EmbeddingModel:      "text-embedding-3-small",
    },
)

// Relaxed threshold for more cache hits
cache := sdk.NewSemanticCache(vectorStore, cacheStore, logger, metrics,
    sdk.SemanticCacheConfig{
        SimilarityThreshold: 0.85,    // Relaxed: more cache hits
        TTL:                 24 * time.Hour,
    },
)

Manual Cache Operations

// Get from cache
entry, _ := cache.Get(ctx, query, embedding)
if entry != nil {
    fmt.Println("Cache hit!")
    return entry.Response
}

// Set cache
cache.Set(ctx, query, embedding, response)

// Delete
cache.Delete(ctx, cacheKey)

// Clear all
cache.Clear(ctx)

Cache Statistics

stats := cache.GetStats()

fmt.Printf("Hits: %d\n", stats.Hits)
fmt.Printf("Misses: %d\n", stats.Misses)
fmt.Printf("Hit rate: %.1f%%\n", stats.HitRate * 100)
fmt.Printf("Entries: %d\n", stats.Entries)
fmt.Printf("Savings: $%.2f\n", stats.CostSavings)

Batch Processing

Automatically batch multiple requests for efficiency:

processor := sdk.NewBatchProcessor(
    llmManager,
    logger,
    metrics,
    sdk.BatchConfig{
        MaxBatchSize:  10,                    // Max requests per batch
        MaxWaitTime:   100 * time.Millisecond, // Max time to wait
        WorkerCount:   5,                      // Concurrent workers
    },
)

// Submit requests
for _, prompt := range prompts {
    processor.Submit(ctx, sdk.BatchRequest{
        Prompt: prompt,
        Model:  "gpt-4",
    })
}

// Results are automatically batched and processed

With Callbacks

processor.Submit(ctx, sdk.BatchRequest{
    Prompt: "Hello",
    OnComplete: func(result *sdk.Result) {
        fmt.Printf("Got result: %s\n", result.Content)
    },
    OnError: func(err error) {
        fmt.Printf("Error: %v\n", err)
    },
})

Blocking Wait

// Wait for specific request
response, err := processor.SubmitAndWait(ctx, sdk.BatchRequest{
    Prompt: "What is Go?",
})

fmt.Println(response.Content)

Batch Statistics

stats := processor.GetStats()

fmt.Printf("Requests processed: %d\n", stats.TotalProcessed)
fmt.Printf("Batches: %d\n", stats.TotalBatches)
fmt.Printf("Avg batch size: %.1f\n", stats.AvgBatchSize)
fmt.Printf("Cost savings: $%.2f\n", stats.CostSavings)

Combined: Caching + Batching

Maximum efficiency:

cache := sdk.NewSemanticCache(vectorStore, cacheStore, logger, metrics,
    sdk.SemanticCacheConfig{SimilarityThreshold: 0.95},
)

processor := sdk.NewBatchProcessor(llm, logger, metrics,
    sdk.BatchConfig{MaxBatchSize: 10},
)

// Try cache first, batch if miss
func generate(ctx context.Context, prompt string) (*sdk.Result, error) {
    // Check cache
    embedding, _ := llm.Embed(ctx, prompt)
    if entry, _ := cache.Get(ctx, prompt, embedding); entry != nil {
        return entry.Response, nil
    }
    
    // Cache miss - batch request
    return processor.SubmitAndWait(ctx, sdk.BatchRequest{
        Prompt: prompt,
        OnComplete: func(result *sdk.Result) {
            // Cache the result
            cache.Set(ctx, prompt, embedding, result)
        },
    })
}

Real-World Examples

API Handler with Caching

func generateHandler(c forge.Context) error {
    prompt := c.Query("prompt")
    
    // Semantic cache
    cache := resolveCache(c)
    
    result, err := sdk.NewGenerateBuilder(ctx, llm, logger, metrics).
        WithPrompt(prompt).
        WithCache(cache).
        Execute()
    
    if err != nil {
        return c.JSON(500, map[string]string{"error": err.Error()})
    }
    
    // Return with cache status
    return c.JSON(200, map[string]interface{}{
        "content":    result.Content,
        "cached":     result.FromCache,
        "cache_age":  result.CacheAge,
        "tokens":     result.Usage.TotalTokens,
    })
}

Bulk Processing

func processBulkRequests(prompts []string) ([]*sdk.Result, error) {
    processor := sdk.NewBatchProcessor(llm, logger, metrics,
        sdk.BatchConfig{
            MaxBatchSize: 20,
            MaxWaitTime:  200 * time.Millisecond,
            WorkerCount:  10,
        },
    )
    
    results := make([]*sdk.Result, len(prompts))
    var wg sync.WaitGroup
    
    for i, prompt := range prompts {
        wg.Add(1)
        i, prompt := i, prompt  // Capture
        
        processor.Submit(ctx, sdk.BatchRequest{
            Prompt: prompt,
            OnComplete: func(result *sdk.Result) {
                results[i] = result
                wg.Done()
            },
            OnError: func(err error) {
                log.Printf("Error for prompt %d: %v", i, err)
                wg.Done()
            },
        })
    }
    
    wg.Wait()
    return results, nil
}

Tiered Caching

type TieredCache struct {
    l1 *sdk.InMemoryCache    // Fast, small
    l2 *sdk.SemanticCache    // Slower, large
}

func (t *TieredCache) Get(ctx context.Context, prompt string) (*sdk.Result, error) {
    // Try L1 (in-memory)
    if result := t.l1.Get(prompt); result != nil {
        return result, nil
    }
    
    // Try L2 (semantic)
    embedding, _ := llm.Embed(ctx, prompt)
    if entry, _ := t.l2.Get(ctx, prompt, embedding); entry != nil {
        // Promote to L1
        t.l1.Set(prompt, entry.Response)
        return entry.Response, nil
    }
    
    return nil, sdk.ErrCacheMiss
}

Performance Impact

Caching Savings

Without caching:
- 1000 requests/day
- $0.006 per request
- Cost: $6.00/day = $180/month

With 30% cache hit rate:
- 700 API calls
- 300 cached
- Cost: $4.20/day = $126/month
- Savings: $54/month (30%)

With 60% cache hit rate:
- 400 API calls
- 600 cached
- Cost: $2.40/day = $72/month
- Savings: $108/month (60%)

Batching Savings

Without batching:
- 100 requests = 100 API calls
- Average 20ms latency each
- Total time: 2000ms
- Cost: 100 × $0.006 = $0.60

With batching (10 per batch):
- 100 requests = 10 batches
- Average 50ms per batch
- Total time: 500ms (4x faster!)
- Cost: 10 × $0.06 = $0.60
- Benefit: 75% faster, same cost

Configuration Guidelines

For High Traffic

// Aggressive caching
cache := sdk.NewSemanticCache(vectorStore, cacheStore, logger, metrics,
    sdk.SemanticCacheConfig{
        SimilarityThreshold: 0.90,     // More lenient
        TTL:                 24 * time.Hour,
        MaxEntries:          100000,   // Large cache
    },
)

// Aggressive batching
processor := sdk.NewBatchProcessor(llm, logger, metrics,
    sdk.BatchConfig{
        MaxBatchSize:  50,              // Large batches
        MaxWaitTime:   200 * time.Millisecond,
        WorkerCount:   20,              // Many workers
    },
)

For Low Latency

// Quick cache checks
cache := sdk.NewSemanticCache(vectorStore, cacheStore, logger, metrics,
    sdk.SemanticCacheConfig{
        SimilarityThreshold: 0.98,     // Very strict
        TTL:                 1 * time.Hour,
        MaxEntries:          1000,     // Small, fast
    },
)

// Minimal batching
processor := sdk.NewBatchProcessor(llm, logger, metrics,
    sdk.BatchConfig{
        MaxBatchSize:  5,               // Small batches
        MaxWaitTime:   50 * time.Millisecond,  // Low wait
        WorkerCount:   10,
    },
)

Monitoring

// Cache metrics
go func() {
    ticker := time.NewTicker(1 * time.Minute)
    for range ticker.C {
        stats := cache.GetStats()
        metrics.Gauge("cache.hit_rate").Set(stats.HitRate)
        metrics.Gauge("cache.entries").Set(float64(stats.Entries))
        metrics.Gauge("cache.savings").Set(stats.CostSavings)
    }
}()

// Batch metrics
go func() {
    ticker := time.NewTicker(1 * time.Minute)
    for range ticker.C {
        stats := processor.GetStats()
        metrics.Gauge("batch.avg_size").Set(stats.AvgBatchSize)
        metrics.Gauge("batch.queue_size").Set(float64(stats.QueueSize))
    }
}()

Next Steps

How is this guide?

Last updated on