Skip to content

Graph orchestration

When you need explicit, deterministic, durable control flow — cycles, fan-out/fan-in, retries, and human-in-the-loop — use StateGraph. It executes in Bulk-Synchronous-Parallel supersteps (planned by the Rust core), checkpoints state at every step, and resumes by thread_id after a crash or an interrupt.

Nodes, edges, state

A node is a function (state) -> updates (or (state, ctx) -> updates). Edges wire nodes together; START and END are sentinels.

from yaab.graph import StateGraph, START, END

g = StateGraph()
g.add_node("fetch", lambda s: {"docs": fetch(s["query"])})
g.add_node("answer", lambda s: {"answer": summarize(s["docs"])})
g.add_edge(START, "fetch")
g.add_edge("fetch", "answer")
g.set_finish_point("answer")

result = g.compile().invoke({"query": "yaab"})
print(result.state["answer"])

Channels & reducers

A Channel declares how writes to a state key are merged:

from yaab.graph import StateGraph, Channel

g = StateGraph(channels={
    "count": Channel("add", default=0),       # numeric sum
    "logs":  Channel("append", default=[]),   # accumulate into a list
    "answer": Channel("last_value"),          # overwrite (default)
})

Reducers run in Rust (last_value, append, add).

Conditional edges & cycles

Route dynamically and loop:

g.add_conditional_edges(
    "inc",
    lambda s: "inc" if s["count"] < 3 else END,
    {"inc": "inc", END: END},
)

Durable execution & checkpoints

Pass a checkpointer; state is persisted at every superstep, enabling crash recovery and time-travel:

from yaab.graph import MemorySaver, SQLiteSaver

app = g.compile(checkpointer=SQLiteSaver("checkpoints.db"))
app.invoke({"query": "x"}, thread_id="job-42")

# Inspect the full history (time-travel debugging):
for step, snapshot in app.checkpointer.history("job-42"):
    print(step, snapshot["state"])

Human-in-the-loop

A node calls ctx.interrupt(value) to pause. The runtime checkpoints, returns interrupted=True with the value; you resume the same thread with the human's decision and the call returns it.

def approve(state, ctx):
    decision = ctx.interrupt({"review": state["draft"]})   # pauses here
    return {"approved": decision}

app = g.compile(checkpointer=MemorySaver())
paused = app.invoke({...}, thread_id="t1")
assert paused.interrupted
done = app.invoke(thread_id="t1", resume=True)    # human approved

Choosing the engine (Python vs Rust)

A compiled graph advances each superstep's state with one of two engines — your choice:

app = g.compile(engine="auto")     # rust if yaab-core is built, else python (default)
app = g.compile(engine="rust")     # force the native whole-superstep fold
app = g.compile(engine="python")   # force the pure-Python engine
print(app.engine)                   # "rust" | "python"

Both produce identical results. The Rust engine folds an entire superstep's node updates in a single native call (one cross-language hop per superstep instead of one per state key), which helps wide fan-outs and large state; the Python engine has zero native dependency. engine="rust" raises if the yaab-core extension isn't built — use "auto" to degrade gracefully.

What is not in Rust: the graph's control flow — routing, conditional edges, HITL interrupts, checkpoint orchestration, and your node functions — all run in Python regardless of engine. Rust only does the deterministic state fold.

Mixing with agents

Nodes are plain functions, so call agents inside them:

async def research(state, ctx):
    result = await researcher.run(state["question"])
    return {"findings": result.output}

g.add_node("research", research)   # async nodes are awaited automatically

This is the deterministic counterpart to the model-driven fast path and the multi-agent workflows — reach for it when an auditor or SLA needs inspectable, resumable control flow.