AI SDK
Tools & Workflows
Dynamic tools and DAG workflow orchestration
Tools & Workflows
Dynamic tool system and DAG-based workflow engine for multi-agent orchestration.
Tools
Register a Tool
registry := sdk.NewToolRegistry(logger, metrics)
registry.RegisterTool(&sdk.Tool{
Name: "get_weather",
Description: "Get current weather for a location",
Version: "1.0.0",
Parameters: map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"location": map[string]interface{}{
"type": "string",
"description": "City name",
},
"units": map[string]interface{}{
"type": "string",
"enum": []string{"celsius", "fahrenheit"},
},
},
"required": []string{"location"},
},
Handler: func(ctx context.Context, args map[string]interface{}) (interface{}, error) {
location := args["location"].(string)
units := args["units"].(string)
weather, err := weatherAPI.Get(location, units)
return weather, err
},
})Execute Tool
result, err := registry.ExecuteTool(ctx, "get_weather", map[string]interface{}{
"location": "San Francisco",
"units": "celsius",
})Auto-Discovery
// Automatically discover tools from a directory
registry.DiscoverTools("./tools")
// List all tools
tools := registry.ListTools()
for _, tool := range tools {
fmt.Printf("%s v%s: %s\n", tool.Name, tool.Version, tool.Description)
}Search Tools
// Semantic search for tools
tools, _ := registry.SearchTools(ctx, "weather and temperature")
// Category-based search
tools, _ := registry.GetToolsByCategory(ctx, "api")
// Tag-based search
tools, _ := registry.GetToolsByTags(ctx, []string{"external", "api"})Versioning
// Register multiple versions
registry.RegisterTool(&sdk.Tool{
Name: "calculator",
Version: "1.0.0",
Handler: calculatorV1,
})
registry.RegisterTool(&sdk.Tool{
Name: "calculator",
Version: "2.0.0",
Handler: calculatorV2,
})
// Execute specific version
result, _ := registry.ExecuteTool(ctx, "calculator@2.0.0", args)
// Execute latest
result, _ := registry.ExecuteTool(ctx, "calculator", args)Workflows
Create a Workflow
workflow := sdk.NewWorkflowEngine(logger, metrics)
// Add nodes
workflow.AddNode("research", researchAgent, nil)
workflow.AddNode("write", writeAgent, []string{"research"})
workflow.AddNode("review", reviewAgent, []string{"write"})
workflow.AddNode("publish", publishAgent, []string{"review"})Execute Workflow
result, err := workflow.Execute(ctx, map[string]interface{}{
"topic": "Go concurrency patterns",
})
fmt.Printf("Final result: %v\n", result)Parallel Execution
workflow := sdk.NewWorkflowEngine(logger, metrics)
// These can run in parallel
workflow.AddNode("fetch_data", fetchAgent, nil)
workflow.AddNode("fetch_images", imageAgent, nil)
workflow.AddNode("fetch_videos", videoAgent, nil)
// This depends on all three
workflow.AddNode("combine", combineAgent, []string{
"fetch_data", "fetch_images", "fetch_videos",
})Conditional Nodes
workflow.AddNode("analyze", analyzeAgent, nil,
sdk.WithCondition(func(ctx context.Context, result interface{}) bool {
// Only run if analysis is needed
return result.(map[string]interface{})["needs_analysis"].(bool)
}),
)Node Retry
workflow.AddNode("api_call", apiAgent, nil,
sdk.WithRetry(sdk.RetryConfig{
MaxRetries: 3,
InitialDelay: 1 * time.Second,
}),
)Real-World Examples
Content Creation Pipeline
func createContentPipeline() *sdk.WorkflowEngine {
workflow := sdk.NewWorkflowEngine(logger, metrics)
// Research phase
workflow.AddNode("research", createResearchAgent(), nil)
// Writing phase (depends on research)
workflow.AddNode("outline", createOutlineAgent(), []string{"research"})
workflow.AddNode("draft", createDraftAgent(), []string{"outline"})
// Review phase (depends on draft)
workflow.AddNode("grammar", createGrammarAgent(), []string{"draft"})
workflow.AddNode("fact_check", createFactCheckAgent(), []string{"draft"})
// Final phase (depends on all reviews)
workflow.AddNode("finalize", createFinalizeAgent(), []string{
"grammar", "fact_check",
})
return workflow
}
// Execute
workflow := createContentPipeline()
result, _ := workflow.Execute(ctx, map[string]interface{}{
"topic": "AI in Healthcare",
"length": "2000 words",
})Data Processing Pipeline
func createDataPipeline() *sdk.WorkflowEngine {
workflow := sdk.NewWorkflowEngine(logger, metrics)
// Extract phase
workflow.AddNode("extract_csv", extractCSVAgent, nil)
workflow.AddNode("extract_json", extractJSONAgent, nil)
workflow.AddNode("extract_xml", extractXMLAgent, nil)
// Transform phase
workflow.AddNode("transform", transformAgent, []string{
"extract_csv", "extract_json", "extract_xml",
})
// Load phase
workflow.AddNode("validate", validateAgent, []string{"transform"})
workflow.AddNode("load", loadAgent, []string{"validate"})
return workflow
}Customer Support Workflow
func createSupportWorkflow() *sdk.WorkflowEngine {
workflow := sdk.NewWorkflowEngine(logger, metrics)
// Triage
workflow.AddNode("triage", triageAgent, nil)
// Route to specialists
workflow.AddNode("technical", technicalAgent, []string{"triage"},
sdk.WithCondition(func(ctx context.Context, result interface{}) bool {
return result.(map[string]interface{})["category"] == "technical"
}),
)
workflow.AddNode("billing", billingAgent, []string{"triage"},
sdk.WithCondition(func(ctx context.Context, result interface{}) bool {
return result.(map[string]interface{})["category"] == "billing"
}),
)
// Follow-up
workflow.AddNode("followup", followupAgent, []string{"technical", "billing"})
return workflow
}Advanced Features
Dynamic Workflow Modification
// Add node dynamically
workflow.AddNode("new_step", newAgent, []string{"existing_step"})
// Remove node
workflow.RemoveNode("old_step")
// Update dependencies
workflow.UpdateDependencies("node", []string{"new_dep1", "new_dep2"})Workflow State
// Get current state
state := workflow.GetState()
fmt.Printf("Status: %s\n", state.Status)
fmt.Printf("Progress: %.1f%%\n", state.Progress * 100)
// Get node results
for nodeID, result := range state.NodeResults {
fmt.Printf("%s: %v\n", nodeID, result)
}Error Handling
workflow.OnNodeError(func(nodeID string, err error) {
log.Printf("Node %s failed: %v", nodeID, err)
// Notify admin
notifyAdmin(nodeID, err)
})
workflow.OnComplete(func(result interface{}) {
log.Printf("Workflow completed: %v", result)
})Timeout
// Global timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
result, err := workflow.Execute(ctx, input)
// Per-node timeout
workflow.AddNode("slow_node", slowAgent, nil,
sdk.WithTimeout(30 * time.Second),
)Tool Best Practices
Parameter Validation
registry.RegisterTool(&sdk.Tool{
Name: "send_email",
Parameters: map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"to": map[string]interface{}{
"type": "string",
"format": "email",
},
"subject": map[string]string{
"type": "string",
"minLength": "1",
"maxLength": "200",
},
},
"required": []string{"to", "subject"},
},
Handler: sendEmailHandler,
})Error Handling
Handler: func(ctx context.Context, args map[string]interface{}) (interface{}, error) {
location, ok := args["location"].(string)
if !ok {
return nil, fmt.Errorf("location must be a string")
}
result, err := externalAPI.Call(location)
if err != nil {
return nil, fmt.Errorf("api call failed: %w", err)
}
return result, nil
}Timeouts
Handler: func(ctx context.Context, args map[string]interface{}) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return slowOperation(ctx, args)
}Workflow Best Practices
Avoid Cycles
// BAD - Creates a cycle
workflow.AddNode("A", agentA, []string{"B"})
workflow.AddNode("B", agentB, []string{"A"}) // Cycle!
// Workflow engine will detect and return errorParallel When Possible
// GOOD - Parallel execution
workflow.AddNode("fetch1", agent1, nil)
workflow.AddNode("fetch2", agent2, nil)
workflow.AddNode("fetch3", agent3, nil)
workflow.AddNode("combine", combineAgent, []string{"fetch1", "fetch2", "fetch3"})
// BAD - Unnecessary sequential
workflow.AddNode("fetch1", agent1, nil)
workflow.AddNode("fetch2", agent2, []string{"fetch1"}) // No need to depend
workflow.AddNode("fetch3", agent3, []string{"fetch2"}) // on previousHandle Failures
workflow.AddNode("critical", agent, deps,
sdk.WithRetry(sdk.RetryConfig{
MaxRetries: 3,
}),
sdk.WithFallback(fallbackAgent),
)Next Steps
- Agents - Use tools with agents
- Resilience - Add resilience patterns
- Examples - Workflow examples
How is this guide?
Last updated on