yaab.agent¶
The Agent class: the typed unit of work.
yaab.agent ¶
The typed :class:Agent — YAAB's primary developer-facing abstraction.
Agent[Deps, Output] is generic over a dependency-injection type and an
output type, fusing type-safety with a clean agent/runner split.
The three-line "hello agent" works with zero ceremony; every layer underneath
(runner, sessions, governance, graph) is openable when you need it.
agent = Agent("assistant", model="openai/gpt-4o", instructions="Be helpful.")
result = agent.run_sync("Hello!")
print(result.output)
Agent ¶
Bases: Generic[Deps, Output]
A type-safe agent: a model + instructions + tools + an output contract.
model
property
¶
model: ModelProvider
Resolve (and cache) the model provider, wrapping it for tracing.
tool ¶
Register a tool on this agent (decorator form).
as_tool ¶
Expose this agent as a tool for another agent (Agent-as-Tool).
reset ¶
reset() -> Agent[Deps, Output]
Reset per-agent run state so the instance is clean for reuse.
Clears the cached (lazily-resolved, tracing-wrapped) model provider so the
next run re-resolves it. Conversation history is not held on the Agent
— it lives in the session service — so to clear a conversation, start a new
session_id or clear it via your :class:SessionManager. Returns
self for chaining.
run
async
¶
run(prompt: str = '', *, resume: Any | None = None, deps: Deps = None, session_id: str | None = None, identity: str | None = None, state: State | None = None, usage_limits: Any | None = None, cancellation: Any | None = None, timeout: float | None = None, resume_id: str | None = None) -> RunResult[Output]
Run the agent's model-driven loop and return a typed result.
usage_limits (:class:~yaab.limits.UsageLimits) caps tokens/requests/
tool calls; cancellation (:class:~yaab.limits.CancellationToken) and
timeout (seconds) stop the run cooperatively between steps.
state is the run's shared :class:~yaab.state.State. Leave it None
for a standalone run (the runner builds one over the session); a workflow
agent or a delegation passes its own State so every participant shares one
object — values written in one step are read in the next by key.
resume_id makes a run fault-tolerant: when the runner has a
checkpointer, loop progress is persisted under this key after every
completed step, so a crashed or paused run resumes from where it left
off if re-invoked with the same resume_id — without re-requesting the
model turns already captured. It is inert (zero overhead) when the runner
has no checkpointer.
resume is the one way to continue a paused run: pass the
:class:~yaab.governance.approvals_decide.Decision (or
:class:~yaab.governance.approvals_decide.ResumeBundle) a human produced.
It carries the approval_id and the resume_id (the checkpoint key),
so prompt and session_id are not needed on resume — correlation is
by the decision, never by session. The decided value flows back to the held
tool as a typed value (an edit's corrected args, a denial's reason, an
ask_user answer).
stream ¶
stream(prompt: Any, *, deps: Deps = None, session_id: str | None = None, identity: str | None = None) -> Any
Stream the answer token-by-token (single turn, no tool loop).
Returns an async iterator of text deltas::
async for token in agent.stream("tell me a joke"):
print(token, end="")
stream_structured ¶
stream_structured(prompt: Any, *, output_type: type | None = None, deps: Deps = None, identity: str | None = None) -> Any
Stream partial typed objects as the model generates JSON.
Yields successive partial instances of output_type (defaults to the
agent's output_type); the final yield is the fully-validated object::
async for partial in agent.stream_structured("...", output_type=Weather):
render(partial)
stream_events ¶
stream_events(prompt: str, *, deps: Deps = None, session_id: str | None = None, identity: str | None = None, state: State | None = None, usage_limits: Any | None = None, cancellation: Any | None = None, timeout: float | None = None) -> Any
Stream the full multi-step run as typed events, tokens included.
Yields :class:~yaab.types.Event objects: TEXT_DELTA token deltas as
the model generates, TOOL_CALL/TOOL_RESULT as tools run mid-run,
and a terminal FINAL_OUTPUT + RUN_END (which carries the
:class:~yaab.types.RunResult). Unlike :meth:stream this drives the
whole tool loop, not just the answering turn::
async for event in agent.stream_events("..."):
if event.type is EventType.TEXT_DELTA:
print(event.payload["delta"], end="")
run_sync ¶
run_sync(prompt: str = '', *, resume: Any | None = None, deps: Deps = None, session_id: str | None = None, identity: str | None = None, state: State | None = None, usage_limits: Any | None = None, cancellation: Any | None = None, timeout: float | None = None, resume_id: str | None = None) -> RunResult[Output]
Synchronous convenience wrapper around :meth:run.