AI SDK
Caching & Batching
Optimize costs with semantic caching and batch processing
Caching & Batching
Reduce costs and improve performance with semantic caching and automatic batching.
Semantic Caching
Vector-based similarity matching for intelligent cache hits:
cache := sdk.NewSemanticCache(
vectorStore,
cacheStore,
logger,
metrics,
sdk.SemanticCacheConfig{
SimilarityThreshold: 0.95, // 95% similarity for cache hit
TTL: 1 * time.Hour,
},
)
// First request - cache miss
result1, _ := sdk.NewGenerateBuilder(ctx, llm, logger, metrics).
WithPrompt("What is Go?").
WithCache(cache).
Execute()
// Similar request - cache HIT!
result2, _ := sdk.NewGenerateBuilder(ctx, llm, logger, metrics).
WithPrompt("Tell me about the Go programming language").
WithCache(cache). // Returns cached result!
Execute()How It Works
- Query embedding: Convert prompt to vector
- Similarity search: Find similar cached queries
- Threshold check: If similarity ≥ threshold, return cached response
- Cache miss: Execute request and cache result
Configuration
cache := sdk.NewSemanticCache(vectorStore, cacheStore, logger, metrics,
sdk.SemanticCacheConfig{
SimilarityThreshold: 0.95, // Strict: exact matches only
TTL: 1 * time.Hour,
MaxEntries: 10000,
EmbeddingModel: "text-embedding-3-small",
},
)
// Relaxed threshold for more cache hits
cache := sdk.NewSemanticCache(vectorStore, cacheStore, logger, metrics,
sdk.SemanticCacheConfig{
SimilarityThreshold: 0.85, // Relaxed: more cache hits
TTL: 24 * time.Hour,
},
)Manual Cache Operations
// Get from cache
entry, _ := cache.Get(ctx, query, embedding)
if entry != nil {
fmt.Println("Cache hit!")
return entry.Response
}
// Set cache
cache.Set(ctx, query, embedding, response)
// Delete
cache.Delete(ctx, cacheKey)
// Clear all
cache.Clear(ctx)Cache Statistics
stats := cache.GetStats()
fmt.Printf("Hits: %d\n", stats.Hits)
fmt.Printf("Misses: %d\n", stats.Misses)
fmt.Printf("Hit rate: %.1f%%\n", stats.HitRate * 100)
fmt.Printf("Entries: %d\n", stats.Entries)
fmt.Printf("Savings: $%.2f\n", stats.CostSavings)Batch Processing
Automatically batch multiple requests for efficiency:
processor := sdk.NewBatchProcessor(
llmManager,
logger,
metrics,
sdk.BatchConfig{
MaxBatchSize: 10, // Max requests per batch
MaxWaitTime: 100 * time.Millisecond, // Max time to wait
WorkerCount: 5, // Concurrent workers
},
)
// Submit requests
for _, prompt := range prompts {
processor.Submit(ctx, sdk.BatchRequest{
Prompt: prompt,
Model: "gpt-4",
})
}
// Results are automatically batched and processedWith Callbacks
processor.Submit(ctx, sdk.BatchRequest{
Prompt: "Hello",
OnComplete: func(result *sdk.Result) {
fmt.Printf("Got result: %s\n", result.Content)
},
OnError: func(err error) {
fmt.Printf("Error: %v\n", err)
},
})Blocking Wait
// Wait for specific request
response, err := processor.SubmitAndWait(ctx, sdk.BatchRequest{
Prompt: "What is Go?",
})
fmt.Println(response.Content)Batch Statistics
stats := processor.GetStats()
fmt.Printf("Requests processed: %d\n", stats.TotalProcessed)
fmt.Printf("Batches: %d\n", stats.TotalBatches)
fmt.Printf("Avg batch size: %.1f\n", stats.AvgBatchSize)
fmt.Printf("Cost savings: $%.2f\n", stats.CostSavings)Combined: Caching + Batching
Maximum efficiency:
cache := sdk.NewSemanticCache(vectorStore, cacheStore, logger, metrics,
sdk.SemanticCacheConfig{SimilarityThreshold: 0.95},
)
processor := sdk.NewBatchProcessor(llm, logger, metrics,
sdk.BatchConfig{MaxBatchSize: 10},
)
// Try cache first, batch if miss
func generate(ctx context.Context, prompt string) (*sdk.Result, error) {
// Check cache
embedding, _ := llm.Embed(ctx, prompt)
if entry, _ := cache.Get(ctx, prompt, embedding); entry != nil {
return entry.Response, nil
}
// Cache miss - batch request
return processor.SubmitAndWait(ctx, sdk.BatchRequest{
Prompt: prompt,
OnComplete: func(result *sdk.Result) {
// Cache the result
cache.Set(ctx, prompt, embedding, result)
},
})
}Real-World Examples
API Handler with Caching
func generateHandler(c forge.Context) error {
prompt := c.Query("prompt")
// Semantic cache
cache := resolveCache(c)
result, err := sdk.NewGenerateBuilder(ctx, llm, logger, metrics).
WithPrompt(prompt).
WithCache(cache).
Execute()
if err != nil {
return c.JSON(500, map[string]string{"error": err.Error()})
}
// Return with cache status
return c.JSON(200, map[string]interface{}{
"content": result.Content,
"cached": result.FromCache,
"cache_age": result.CacheAge,
"tokens": result.Usage.TotalTokens,
})
}Bulk Processing
func processBulkRequests(prompts []string) ([]*sdk.Result, error) {
processor := sdk.NewBatchProcessor(llm, logger, metrics,
sdk.BatchConfig{
MaxBatchSize: 20,
MaxWaitTime: 200 * time.Millisecond,
WorkerCount: 10,
},
)
results := make([]*sdk.Result, len(prompts))
var wg sync.WaitGroup
for i, prompt := range prompts {
wg.Add(1)
i, prompt := i, prompt // Capture
processor.Submit(ctx, sdk.BatchRequest{
Prompt: prompt,
OnComplete: func(result *sdk.Result) {
results[i] = result
wg.Done()
},
OnError: func(err error) {
log.Printf("Error for prompt %d: %v", i, err)
wg.Done()
},
})
}
wg.Wait()
return results, nil
}Tiered Caching
type TieredCache struct {
l1 *sdk.InMemoryCache // Fast, small
l2 *sdk.SemanticCache // Slower, large
}
func (t *TieredCache) Get(ctx context.Context, prompt string) (*sdk.Result, error) {
// Try L1 (in-memory)
if result := t.l1.Get(prompt); result != nil {
return result, nil
}
// Try L2 (semantic)
embedding, _ := llm.Embed(ctx, prompt)
if entry, _ := t.l2.Get(ctx, prompt, embedding); entry != nil {
// Promote to L1
t.l1.Set(prompt, entry.Response)
return entry.Response, nil
}
return nil, sdk.ErrCacheMiss
}Performance Impact
Caching Savings
Without caching:
- 1000 requests/day
- $0.006 per request
- Cost: $6.00/day = $180/month
With 30% cache hit rate:
- 700 API calls
- 300 cached
- Cost: $4.20/day = $126/month
- Savings: $54/month (30%)
With 60% cache hit rate:
- 400 API calls
- 600 cached
- Cost: $2.40/day = $72/month
- Savings: $108/month (60%)Batching Savings
Without batching:
- 100 requests = 100 API calls
- Average 20ms latency each
- Total time: 2000ms
- Cost: 100 × $0.006 = $0.60
With batching (10 per batch):
- 100 requests = 10 batches
- Average 50ms per batch
- Total time: 500ms (4x faster!)
- Cost: 10 × $0.06 = $0.60
- Benefit: 75% faster, same costConfiguration Guidelines
For High Traffic
// Aggressive caching
cache := sdk.NewSemanticCache(vectorStore, cacheStore, logger, metrics,
sdk.SemanticCacheConfig{
SimilarityThreshold: 0.90, // More lenient
TTL: 24 * time.Hour,
MaxEntries: 100000, // Large cache
},
)
// Aggressive batching
processor := sdk.NewBatchProcessor(llm, logger, metrics,
sdk.BatchConfig{
MaxBatchSize: 50, // Large batches
MaxWaitTime: 200 * time.Millisecond,
WorkerCount: 20, // Many workers
},
)For Low Latency
// Quick cache checks
cache := sdk.NewSemanticCache(vectorStore, cacheStore, logger, metrics,
sdk.SemanticCacheConfig{
SimilarityThreshold: 0.98, // Very strict
TTL: 1 * time.Hour,
MaxEntries: 1000, // Small, fast
},
)
// Minimal batching
processor := sdk.NewBatchProcessor(llm, logger, metrics,
sdk.BatchConfig{
MaxBatchSize: 5, // Small batches
MaxWaitTime: 50 * time.Millisecond, // Low wait
WorkerCount: 10,
},
)Monitoring
// Cache metrics
go func() {
ticker := time.NewTicker(1 * time.Minute)
for range ticker.C {
stats := cache.GetStats()
metrics.Gauge("cache.hit_rate").Set(stats.HitRate)
metrics.Gauge("cache.entries").Set(float64(stats.Entries))
metrics.Gauge("cache.savings").Set(stats.CostSavings)
}
}()
// Batch metrics
go func() {
ticker := time.NewTicker(1 * time.Minute)
for range ticker.C {
stats := processor.GetStats()
metrics.Gauge("batch.avg_size").Set(stats.AvgBatchSize)
metrics.Gauge("batch.queue_size").Set(float64(stats.QueueSize))
}
}()Next Steps
- Cost Management - Track savings
- Resilience - Add reliability
- Examples - Caching & batching examples
How is this guide?
Last updated on