Workflows

DAG-based workflow engine with parallel execution and conditional branching

The workflow engine orchestrates multi-step AI pipelines as directed acyclic graphs (DAGs). Nodes can be agents, tools, conditions, transforms, or wait steps. Independent nodes execute in parallel automatically.

Creating a Workflow

import sdk "github.com/xraph/ai-sdk"

workflow := sdk.NewWorkflow("wf-1", "content-pipeline", logger, metrics)

Node Types

TypeConstantDescription
AgentNodeTypeAgentRuns an agent with the given input
ToolNodeTypeToolExecutes a tool with parameters
ConditionNodeTypeConditionBranches based on a condition
TransformNodeTypeTransformTransforms data between steps
ParallelNodeTypeParallelRuns child nodes in parallel
SequenceNodeTypeSequenceRuns child nodes in order
WaitNodeTypeWaitWaits for a duration or event

Adding Nodes and Edges

// Add an agent node
workflow.AddNode(&sdk.WorkflowNode{
    ID:      "research",
    Type:    sdk.NodeTypeAgent,
    Name:    "Research",
    AgentID: "researcher",
    Timeout: 2 * time.Minute,
})

// Add a tool node
workflow.AddNode(&sdk.WorkflowNode{
    ID:       "format",
    Type:     sdk.NodeTypeTool,
    Name:     "Format Output",
    ToolName: "markdown_formatter",
    ToolParams: map[string]any{
        "style": "article",
    },
})

// Add a condition node
workflow.AddNode(&sdk.WorkflowNode{
    ID:   "quality-check",
    Type: sdk.NodeTypeCondition,
    Name: "Quality Check",
    ConditionHandler: func(ctx context.Context, data map[string]any) (bool, error) {
        score, ok := data["quality_score"].(float64)
        return ok && score > 0.8, nil
    },
})

// Define execution order
workflow.AddEdge("research", "format")
workflow.AddEdge("format", "quality-check")

Execution

execution, err := workflow.Execute(ctx, map[string]any{
    "topic": "AI in healthcare",
})
if err != nil {
    return err
}

fmt.Printf("Status: %s\n", execution.Status)
fmt.Printf("Duration: %s\n", execution.Duration)

// Access results from each node
for id, node := range execution.NodeResults {
    fmt.Printf("Node %s: %v\n", id, node.Result)
}

Workflow Builder

For a more fluent API:

workflow, err := sdk.NewWorkflowBuilder().
    WithName("content-pipeline").
    AddAgentNode("research", "Research", researchAgent).
    AddAgentNode("write", "Write Article", writerAgent).
    AddToolNode("format", "Format", formatterTool).
    AddEdge("research", "write").
    AddEdge("write", "format").
    Build()

Parallel Execution

Nodes without dependencies execute in parallel automatically. You can also explicitly create parallel groups:

// These two nodes have no edges between them, so they run in parallel
workflow.AddNode(&sdk.WorkflowNode{ID: "fetch-data", Type: sdk.NodeTypeTool, ToolName: "fetch_sales"})
workflow.AddNode(&sdk.WorkflowNode{ID: "fetch-users", Type: sdk.NodeTypeTool, ToolName: "fetch_users"})

// Both feed into a merge step
workflow.AddEdge("fetch-data", "merge")
workflow.AddEdge("fetch-users", "merge")

Conditional Branching

workflow.AddNode(&sdk.WorkflowNode{
    ID:   "check-length",
    Type: sdk.NodeTypeCondition,
    ConditionHandler: func(ctx context.Context, data map[string]any) (bool, error) {
        text := data["text"].(string)
        return len(text) > 1000, nil
    },
})

// True branch
workflow.AddEdge("check-length", "summarize")
// False branch can be handled via the workflow's condition routing

Transform Nodes

Transform data between steps:

workflow.AddNode(&sdk.WorkflowNode{
    ID:   "extract-keywords",
    Type: sdk.NodeTypeTransform,
    TransformHandler: func(ctx context.Context, data map[string]any) (any, error) {
        text := data["text"].(string)
        return extractKeywords(text), nil
    },
})

Retry Configuration

workflow.AddNode(&sdk.WorkflowNode{
    ID:       "api-call",
    Type:     sdk.NodeTypeTool,
    ToolName: "external_api",
    Retry: &sdk.RetryConfig{
        MaxRetries: 3,
        Delay:      2 * time.Second,
    },
})

WorkflowExecution Result

FieldTypeDescription
IDstringExecution ID
Statusstringcompleted, failed, running
Durationtime.DurationTotal execution time
NodeResultsmap[string]*WorkflowNodeResults per node
ErrorerrorFirst error encountered

How is this guide?

On this page