Graph API¶
Programmatic access to the workflow DAG for analysis, visualization, and test coverage.
Quick start¶
from agentloom import WorkflowGraph
from agentloom.core.parser import WorkflowParser
workflow = WorkflowParser.from_yaml("examples/03_router_workflow.yaml")
graph = WorkflowGraph.from_workflow(workflow)
You can also build a graph from a bare DAG object (without a full workflow definition):
from agentloom.core.dag import DAG
from agentloom.core.graph import WorkflowGraph
dag = DAG()
dag.add_node("a")
dag.add_node("b")
dag.add_edge("a", "b")
graph = WorkflowGraph.from_dag(dag)
Data models¶
GraphNode¶
Frozen Pydantic model representing a step in the graph:
| Field | Type | Description |
|---|---|---|
id |
str |
Step identifier |
type |
str |
Step type (llm_call, tool, router, subworkflow) |
depends_on |
list[str] |
IDs of predecessor steps |
label |
str |
Display label |
GraphEdge¶
Frozen Pydantic model representing a dependency edge:
| Field | Type | Description |
|---|---|---|
source |
str |
Source step ID |
target |
str |
Target step ID |
label |
str |
Edge label (e.g., router condition) |
Properties¶
| Property | Type | Description |
|---|---|---|
graph.nodes |
list[GraphNode] |
Nodes in topological order |
graph.edges |
list[GraphEdge] |
Edges sorted by (source, target) |
graph.roots |
list[str] |
Entry-point step IDs (no predecessors) |
graph.leaves |
list[str] |
Terminal step IDs (no successors) |
graph.layers |
list[list[str]] |
Steps grouped into parallel-execution layers |
Analysis methods¶
critical_path()¶
Returns the longest path by hop count — the latency bottleneck:
all_paths()¶
Every simple path from root to leaf:
for path in graph.all_paths():
print(" -> ".join(path))
# fetch_data -> analyze -> report
# fetch_data -> analyze -> summarize -> report
prime_paths()¶
Maximal simple paths for test coverage. These are paths that cannot be extended without repeating a node:
get_step_definition(node_id)¶
Retrieve the original step config for a node: