AI SDK

Streaming

Real-time token-by-token streaming with reasoning and tool calls

Streaming

Stream AI responses token-by-token with reasoning steps and tool calls.

Basic Streaming

result, err := sdk.NewStreamBuilder(ctx, llmManager, logger, metrics).
    WithPrompt("Write a haiku about Go").
    OnToken(func(token string) {
        fmt.Print(token)
    }).
    Stream()

With Reasoning

result, err := sdk.NewStreamBuilder(ctx, llmManager, logger, metrics).
    WithPrompt("Solve: What is the square root of 144?").
    WithReasoning(true).
    OnReasoning(func(step string) {
        fmt.Printf("[💭 %s]\n", step)
    }).
    OnToken(func(token string) {
        fmt.Print(token)
    }).
    Stream()

Complete Lifecycle

var buffer strings.Builder

result, err := sdk.NewStreamBuilder(ctx, llm, logger, metrics).
    WithPrompt("Explain recursion").
    
    OnStart(func() {
        log.Println("🚀 Started")
    }).
    
    OnToken(func(token string) {
        buffer.WriteString(token)
        fmt.Print(token)
    }).
    
    OnReasoning(func(step string) {
        log.Printf("💭 Thinking: %s", step)
    }).
    
    OnToolCall(func(tc *sdk.ToolCallResult) {
        log.Printf("🔧 Tool: %s", tc.Name)
    }).
    
    OnComplete(func(result *sdk.Result) {
        log.Printf("✅ Done! Tokens: %d", result.Usage.TotalTokens)
    }).
    
    OnError(func(err error) {
        log.Printf("❌ Error: %v", err)
    }).
    
    Stream()

WebSocket Streaming

func streamHandler(c forge.Context) error {
    ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
    if err != nil {
        return err
    }
    defer ws.Close()
    
    result, err := sdk.NewStreamBuilder(ctx, llm, logger, metrics).
        WithPrompt(c.Query("prompt")).
        OnToken(func(token string) {
            ws.WriteJSON(map[string]string{
                "type": "token",
                "data": token,
            })
        }).
        OnComplete(func(result *sdk.Result) {
            ws.WriteJSON(map[string]interface{}{
                "type": "complete",
                "tokens": result.Usage.TotalTokens,
            })
        }).
        Stream()
    
    return err
}

SSE (Server-Sent Events)

func sseHandler(c forge.Context) error {
    c.Response().Header().Set("Content-Type", "text/event-stream")
    c.Response().Header().Set("Cache-Control", "no-cache")
    c.Response().Header().Set("Connection", "keep-alive")
    
    flusher := c.Response().(http.Flusher)
    
    result, err := sdk.NewStreamBuilder(ctx, llm, logger, metrics).
        WithPrompt(c.Query("prompt")).
        OnToken(func(token string) {
            fmt.Fprintf(c.Response(), "data: %s\n\n", token)
            flusher.Flush()
        }).
        Stream()
    
    return err
}

Next Steps

How is this guide?

Last updated on