Workflows
DAG-based workflow engine with parallel execution and conditional branching
The workflow engine orchestrates multi-step AI pipelines as directed acyclic graphs (DAGs). Nodes can be agents, tools, conditions, transforms, or wait steps. Independent nodes execute in parallel automatically.
Creating a Workflow
import sdk "github.com/xraph/ai-sdk"
workflow := sdk.NewWorkflow("wf-1", "content-pipeline", logger, metrics)Node Types
| Type | Constant | Description |
|---|---|---|
| Agent | NodeTypeAgent | Runs an agent with the given input |
| Tool | NodeTypeTool | Executes a tool with parameters |
| Condition | NodeTypeCondition | Branches based on a condition |
| Transform | NodeTypeTransform | Transforms data between steps |
| Parallel | NodeTypeParallel | Runs child nodes in parallel |
| Sequence | NodeTypeSequence | Runs child nodes in order |
| Wait | NodeTypeWait | Waits for a duration or event |
Adding Nodes and Edges
// Add an agent node
workflow.AddNode(&sdk.WorkflowNode{
ID: "research",
Type: sdk.NodeTypeAgent,
Name: "Research",
AgentID: "researcher",
Timeout: 2 * time.Minute,
})
// Add a tool node
workflow.AddNode(&sdk.WorkflowNode{
ID: "format",
Type: sdk.NodeTypeTool,
Name: "Format Output",
ToolName: "markdown_formatter",
ToolParams: map[string]any{
"style": "article",
},
})
// Add a condition node
workflow.AddNode(&sdk.WorkflowNode{
ID: "quality-check",
Type: sdk.NodeTypeCondition,
Name: "Quality Check",
ConditionHandler: func(ctx context.Context, data map[string]any) (bool, error) {
score, ok := data["quality_score"].(float64)
return ok && score > 0.8, nil
},
})
// Define execution order
workflow.AddEdge("research", "format")
workflow.AddEdge("format", "quality-check")Execution
execution, err := workflow.Execute(ctx, map[string]any{
"topic": "AI in healthcare",
})
if err != nil {
return err
}
fmt.Printf("Status: %s\n", execution.Status)
fmt.Printf("Duration: %s\n", execution.Duration)
// Access results from each node
for id, node := range execution.NodeResults {
fmt.Printf("Node %s: %v\n", id, node.Result)
}Workflow Builder
For a more fluent API:
workflow, err := sdk.NewWorkflowBuilder().
WithName("content-pipeline").
AddAgentNode("research", "Research", researchAgent).
AddAgentNode("write", "Write Article", writerAgent).
AddToolNode("format", "Format", formatterTool).
AddEdge("research", "write").
AddEdge("write", "format").
Build()Parallel Execution
Nodes without dependencies execute in parallel automatically. You can also explicitly create parallel groups:
// These two nodes have no edges between them, so they run in parallel
workflow.AddNode(&sdk.WorkflowNode{ID: "fetch-data", Type: sdk.NodeTypeTool, ToolName: "fetch_sales"})
workflow.AddNode(&sdk.WorkflowNode{ID: "fetch-users", Type: sdk.NodeTypeTool, ToolName: "fetch_users"})
// Both feed into a merge step
workflow.AddEdge("fetch-data", "merge")
workflow.AddEdge("fetch-users", "merge")Conditional Branching
workflow.AddNode(&sdk.WorkflowNode{
ID: "check-length",
Type: sdk.NodeTypeCondition,
ConditionHandler: func(ctx context.Context, data map[string]any) (bool, error) {
text := data["text"].(string)
return len(text) > 1000, nil
},
})
// True branch
workflow.AddEdge("check-length", "summarize")
// False branch can be handled via the workflow's condition routingTransform Nodes
Transform data between steps:
workflow.AddNode(&sdk.WorkflowNode{
ID: "extract-keywords",
Type: sdk.NodeTypeTransform,
TransformHandler: func(ctx context.Context, data map[string]any) (any, error) {
text := data["text"].(string)
return extractKeywords(text), nil
},
})Retry Configuration
workflow.AddNode(&sdk.WorkflowNode{
ID: "api-call",
Type: sdk.NodeTypeTool,
ToolName: "external_api",
Retry: &sdk.RetryConfig{
MaxRetries: 3,
Delay: 2 * time.Second,
},
})WorkflowExecution Result
| Field | Type | Description |
|---|---|---|
ID | string | Execution ID |
Status | string | completed, failed, running |
Duration | time.Duration | Total execution time |
NodeResults | map[string]*WorkflowNode | Results per node |
Error | error | First error encountered |
How is this guide?