AI SDK

Tools & Workflows

Dynamic tools and DAG workflow orchestration

Tools & Workflows

Dynamic tool system and DAG-based workflow engine for multi-agent orchestration.

Tools

Register a Tool

registry := sdk.NewToolRegistry(logger, metrics)

registry.RegisterTool(&sdk.Tool{
    Name:        "get_weather",
    Description: "Get current weather for a location",
    Version:     "1.0.0",
    Parameters: map[string]interface{}{
        "type": "object",
        "properties": map[string]interface{}{
            "location": map[string]interface{}{
                "type":        "string",
                "description": "City name",
            },
            "units": map[string]interface{}{
                "type": "string",
                "enum": []string{"celsius", "fahrenheit"},
            },
        },
        "required": []string{"location"},
    },
    Handler: func(ctx context.Context, args map[string]interface{}) (interface{}, error) {
        location := args["location"].(string)
        units := args["units"].(string)
        
        weather, err := weatherAPI.Get(location, units)
        return weather, err
    },
})

Execute Tool

result, err := registry.ExecuteTool(ctx, "get_weather", map[string]interface{}{
    "location": "San Francisco",
    "units":    "celsius",
})

Auto-Discovery

// Automatically discover tools from a directory
registry.DiscoverTools("./tools")

// List all tools
tools := registry.ListTools()
for _, tool := range tools {
    fmt.Printf("%s v%s: %s\n", tool.Name, tool.Version, tool.Description)
}

Search Tools

// Semantic search for tools
tools, _ := registry.SearchTools(ctx, "weather and temperature")

// Category-based search
tools, _ := registry.GetToolsByCategory(ctx, "api")

// Tag-based search
tools, _ := registry.GetToolsByTags(ctx, []string{"external", "api"})

Versioning

// Register multiple versions
registry.RegisterTool(&sdk.Tool{
    Name:    "calculator",
    Version: "1.0.0",
    Handler: calculatorV1,
})

registry.RegisterTool(&sdk.Tool{
    Name:    "calculator",
    Version: "2.0.0",
    Handler: calculatorV2,
})

// Execute specific version
result, _ := registry.ExecuteTool(ctx, "calculator@2.0.0", args)

// Execute latest
result, _ := registry.ExecuteTool(ctx, "calculator", args)

Workflows

Create a Workflow

workflow := sdk.NewWorkflowEngine(logger, metrics)

// Add nodes
workflow.AddNode("research", researchAgent, nil)
workflow.AddNode("write", writeAgent, []string{"research"})
workflow.AddNode("review", reviewAgent, []string{"write"})
workflow.AddNode("publish", publishAgent, []string{"review"})

Execute Workflow

result, err := workflow.Execute(ctx, map[string]interface{}{
    "topic": "Go concurrency patterns",
})

fmt.Printf("Final result: %v\n", result)

Parallel Execution

workflow := sdk.NewWorkflowEngine(logger, metrics)

// These can run in parallel
workflow.AddNode("fetch_data", fetchAgent, nil)
workflow.AddNode("fetch_images", imageAgent, nil)
workflow.AddNode("fetch_videos", videoAgent, nil)

// This depends on all three
workflow.AddNode("combine", combineAgent, []string{
    "fetch_data", "fetch_images", "fetch_videos",
})

Conditional Nodes

workflow.AddNode("analyze", analyzeAgent, nil, 
    sdk.WithCondition(func(ctx context.Context, result interface{}) bool {
        // Only run if analysis is needed
        return result.(map[string]interface{})["needs_analysis"].(bool)
    }),
)

Node Retry

workflow.AddNode("api_call", apiAgent, nil,
    sdk.WithRetry(sdk.RetryConfig{
        MaxRetries:   3,
        InitialDelay: 1 * time.Second,
    }),
)

Real-World Examples

Content Creation Pipeline

func createContentPipeline() *sdk.WorkflowEngine {
    workflow := sdk.NewWorkflowEngine(logger, metrics)
    
    // Research phase
    workflow.AddNode("research", createResearchAgent(), nil)
    
    // Writing phase (depends on research)
    workflow.AddNode("outline", createOutlineAgent(), []string{"research"})
    workflow.AddNode("draft", createDraftAgent(), []string{"outline"})
    
    // Review phase (depends on draft)
    workflow.AddNode("grammar", createGrammarAgent(), []string{"draft"})
    workflow.AddNode("fact_check", createFactCheckAgent(), []string{"draft"})
    
    // Final phase (depends on all reviews)
    workflow.AddNode("finalize", createFinalizeAgent(), []string{
        "grammar", "fact_check",
    })
    
    return workflow
}

// Execute
workflow := createContentPipeline()
result, _ := workflow.Execute(ctx, map[string]interface{}{
    "topic": "AI in Healthcare",
    "length": "2000 words",
})

Data Processing Pipeline

func createDataPipeline() *sdk.WorkflowEngine {
    workflow := sdk.NewWorkflowEngine(logger, metrics)
    
    // Extract phase
    workflow.AddNode("extract_csv", extractCSVAgent, nil)
    workflow.AddNode("extract_json", extractJSONAgent, nil)
    workflow.AddNode("extract_xml", extractXMLAgent, nil)
    
    // Transform phase
    workflow.AddNode("transform", transformAgent, []string{
        "extract_csv", "extract_json", "extract_xml",
    })
    
    // Load phase
    workflow.AddNode("validate", validateAgent, []string{"transform"})
    workflow.AddNode("load", loadAgent, []string{"validate"})
    
    return workflow
}

Customer Support Workflow

func createSupportWorkflow() *sdk.WorkflowEngine {
    workflow := sdk.NewWorkflowEngine(logger, metrics)
    
    // Triage
    workflow.AddNode("triage", triageAgent, nil)
    
    // Route to specialists
    workflow.AddNode("technical", technicalAgent, []string{"triage"},
        sdk.WithCondition(func(ctx context.Context, result interface{}) bool {
            return result.(map[string]interface{})["category"] == "technical"
        }),
    )
    
    workflow.AddNode("billing", billingAgent, []string{"triage"},
        sdk.WithCondition(func(ctx context.Context, result interface{}) bool {
            return result.(map[string]interface{})["category"] == "billing"
        }),
    )
    
    // Follow-up
    workflow.AddNode("followup", followupAgent, []string{"technical", "billing"})
    
    return workflow
}

Advanced Features

Dynamic Workflow Modification

// Add node dynamically
workflow.AddNode("new_step", newAgent, []string{"existing_step"})

// Remove node
workflow.RemoveNode("old_step")

// Update dependencies
workflow.UpdateDependencies("node", []string{"new_dep1", "new_dep2"})

Workflow State

// Get current state
state := workflow.GetState()
fmt.Printf("Status: %s\n", state.Status)
fmt.Printf("Progress: %.1f%%\n", state.Progress * 100)

// Get node results
for nodeID, result := range state.NodeResults {
    fmt.Printf("%s: %v\n", nodeID, result)
}

Error Handling

workflow.OnNodeError(func(nodeID string, err error) {
    log.Printf("Node %s failed: %v", nodeID, err)
    
    // Notify admin
    notifyAdmin(nodeID, err)
})

workflow.OnComplete(func(result interface{}) {
    log.Printf("Workflow completed: %v", result)
})

Timeout

// Global timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

result, err := workflow.Execute(ctx, input)

// Per-node timeout
workflow.AddNode("slow_node", slowAgent, nil,
    sdk.WithTimeout(30 * time.Second),
)

Tool Best Practices

Parameter Validation

registry.RegisterTool(&sdk.Tool{
    Name: "send_email",
    Parameters: map[string]interface{}{
        "type": "object",
        "properties": map[string]interface{}{
            "to": map[string]interface{}{
                "type":   "string",
                "format": "email",
            },
            "subject": map[string]string{
                "type":      "string",
                "minLength": "1",
                "maxLength": "200",
            },
        },
        "required": []string{"to", "subject"},
    },
    Handler: sendEmailHandler,
})

Error Handling

Handler: func(ctx context.Context, args map[string]interface{}) (interface{}, error) {
    location, ok := args["location"].(string)
    if !ok {
        return nil, fmt.Errorf("location must be a string")
    }
    
    result, err := externalAPI.Call(location)
    if err != nil {
        return nil, fmt.Errorf("api call failed: %w", err)
    }
    
    return result, nil
}

Timeouts

Handler: func(ctx context.Context, args map[string]interface{}) (interface{}, error) {
    ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
    defer cancel()
    
    return slowOperation(ctx, args)
}

Workflow Best Practices

Avoid Cycles

// BAD - Creates a cycle
workflow.AddNode("A", agentA, []string{"B"})
workflow.AddNode("B", agentB, []string{"A"})  // Cycle!

// Workflow engine will detect and return error

Parallel When Possible

// GOOD - Parallel execution
workflow.AddNode("fetch1", agent1, nil)
workflow.AddNode("fetch2", agent2, nil)
workflow.AddNode("fetch3", agent3, nil)
workflow.AddNode("combine", combineAgent, []string{"fetch1", "fetch2", "fetch3"})

// BAD - Unnecessary sequential
workflow.AddNode("fetch1", agent1, nil)
workflow.AddNode("fetch2", agent2, []string{"fetch1"})  // No need to depend
workflow.AddNode("fetch3", agent3, []string{"fetch2"})  // on previous

Handle Failures

workflow.AddNode("critical", agent, deps,
    sdk.WithRetry(sdk.RetryConfig{
        MaxRetries: 3,
    }),
    sdk.WithFallback(fallbackAgent),
)

Next Steps

How is this guide?

Last updated on