Workflows
Workflows compose multiple agents into a deterministic execution structure. Each sub-agent uses its own LLM; the workflow controls the order and state hand-off. For tasks that must survive process crashes, Sagewai adds DurableWorkflow with PostgreSQL-backed step checkpointing, human-in-the-loop approval gates, and distributed workers.
Prerequisites: Agents · Next: Context Engine · Safety
Workflow Agents
Four built-in workflow agents cover the most common composition patterns. See the Agents page for full constructor details.
| Pattern | Description |
|---|---|
SequentialAgent | Runs sub-agents in order, piping output to input |
ParallelAgent | Runs sub-agents concurrently, merges outputs |
ConditionalAgent | Routes input to a branch based on a condition |
LoopAgent | Repeats a sub-agent until a stop condition |
from sagewai import UniversalAgent, SequentialAgent, ParallelAgent
researcher = UniversalAgent(name="researcher", model="gpt-4o")
writer = UniversalAgent(name="writer", model="claude-sonnet-4-20250514")
fact_checker = UniversalAgent(name="fact-checker", model="gpt-4o-mini")
editor = UniversalAgent(name="editor", model="gpt-4o-mini")
# Stage 1: Research
# Stage 2: Write
# Stage 3: Parallel fact-check and edit
review = ParallelAgent(name="review", agents=[fact_checker, editor])
pipeline = SequentialAgent(name="pipeline", agents=[researcher, writer, review])
result = await pipeline.chat("Write about quantum computing advances in 2025")
DurableWorkflow
DurableWorkflow checkpoints each step to PostgreSQL as it completes. If the process crashes mid-run, pass the same run_id on restart and the workflow picks up from where it left off — skipping completed steps automatically.
from sagewai import DurableWorkflow, UniversalAgent
from sagewai.core.stores.postgres import PostgresStore
store = PostgresStore(database_url="postgresql://localhost/sagewai")
await store.initialize()
researcher = UniversalAgent(name="researcher", model="gpt-4o")
writer = UniversalAgent(name="writer", model="gpt-4o")
wf = DurableWorkflow(name="article-pipeline", store=store)
@wf.step("research")
async def research(topic: str) -> str:
return await researcher.chat(topic)
@wf.step("draft", retries=3, retry_delay=2.0)
async def draft(research_output: str) -> str:
return await writer.chat(f"Write an article based on: {research_output}")
@wf.step("publish", timeout=30.0)
async def publish(draft: str) -> str:
# Publish to CMS, send notifications, etc.
return f"Published: {draft[:100]}..."
result = await wf.run(topic="quantum computing")
Step Options
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | Step name (must be unique in the workflow) |
retries | int | 0 | Number of retry attempts on failure |
retry_delay | float | 1.0 | Seconds between retries |
timeout | float | None | None | Step timeout in seconds |
Resuming After Crash
If the process crashes after "research" completes but before "draft" finishes, pass the same run_id to resume:
# Skips "research", resumes from "draft"
result = await wf.run(run_id="article-pipeline-abc123")
The workflow reads checkpointed state from PostgreSQL and skips steps that already completed.
Stores
| Store | Use Case |
|---|---|
InMemoryStore | Testing and development |
PostgresStore | Production (survives process restarts) |
ApprovalGate
ApprovalGate pauses a workflow at a named step and waits for explicit human approval before continuing. Use it when a step involves an irreversible action — publishing, sending, deploying — and you need a human sign-off in the loop.
from sagewai import DurableWorkflow, ApprovalGate, UniversalAgent
from sagewai.core.stores.postgres import PostgresStore
store = PostgresStore(database_url="postgresql://localhost/sagewai")
await store.initialize()
writer = UniversalAgent(name="writer", model="gpt-4o")
wf = DurableWorkflow(name="content-pipeline", store=store)
gate = ApprovalGate(workflow=wf)
@wf.step("draft")
async def draft(topic: str) -> str:
return await writer.chat(f"Write about: {topic}")
@wf.step("review")
async def review(content: str) -> str:
# Pauses the workflow here until a human approves
await gate.request_approval(
prompt=f"Approve publication of: {content[:200]}...",
context={"content": content},
)
return content # Only reached after approval
@wf.step("publish")
async def publish(approved_content: str) -> str:
return f"Published: {approved_content[:100]}..."
# Start the workflow — it will pause at the "review" step
run_id = "content-run-001"
try:
result = await wf.run(run_id=run_id, topic="AI safety")
except Exception:
pass # Workflow is now waiting for approval
Approving or Rejecting
From an admin endpoint, CLI, or any other process:
# Approve — the workflow resumes from the review step
await gate.approve(run_id="content-run-001", reviewer="editor@acme.com")
# Or reject
await gate.reject(
run_id="content-run-001",
reason="Content needs more citations",
reviewer="editor@acme.com",
)
After approval, resume with the same run_id:
result = await wf.run(run_id="content-run-001")
# Picks up from "review", then runs "publish"
WorkflowWorker
WorkflowWorker is a distributed execution consumer. It polls the database for pending runs, claims them atomically with FOR UPDATE SKIP LOCKED, executes them, and emits heartbeats. Run multiple workers against the same database for horizontal scaling.
from sagewai import WorkflowWorker
from sagewai.core.stores.postgres import PostgresStore
store = PostgresStore(database_url="postgresql://localhost/sagewai")
await store.initialize()
worker = WorkflowWorker(
store=store,
workflow_registry={"article-pipeline": my_workflow},
poll_interval=2.0,
heartbeat_interval=10.0,
)
await worker.start() # Blocks until shutdown
Pool and Label Routing
Workers support pool and label-based routing to direct specific workflows to workers that have the right resources — local GPU, specific cloud region, etc.:
from sagewai import WorkflowWorker, WorkerLoadBalancer
from sagewai.models.worker import WorkerCredentials
from sagewai.models.inference import InferenceParams
# Worker with local Ollama on GPU
worker = WorkflowWorker(
store=store,
workflow_registry={"article-pipeline": my_workflow},
pool="local-ollama",
labels={"zone": "local", "gpu": True},
credentials=WorkerCredentials(
model_overrides={"default": "ollama/llama3.2"},
inference_overrides=InferenceParams(
api_base="http://localhost:11434",
),
),
)
The claim query uses JSONB label containment so routing decisions happen in the database, not in application code.
Worker Credentials
Per-worker credentials are injected into agents via ContextVar, so an agent automatically uses the worker's model and API configuration at LLM call time without any changes to your agent code:
from sagewai import get_worker_credentials
# Inside an agent's LLM call (automatic — no user code needed)
creds = get_worker_credentials()
# Returns WorkerCredentials with model_overrides, inference_overrides
WorkflowMonitor
WorkflowMonitor provides visibility into workflow executions — running, completed, and failed:
from sagewai import WorkflowMonitor
from sagewai.core.stores.postgres import PostgresStore
store = PostgresStore(database_url="postgresql://localhost/sagewai")
monitor = WorkflowMonitor(store=store)
# List running workflows
executions = await monitor.list_executions(status="running")
# Inspect a specific execution
detail = await monitor.get_execution("run-abc123")
# Get execution timeline (step-by-step event history)
timeline = await monitor.get_execution_timeline("run-abc123")
# Retry a failed execution
await monitor.retry_execution("run-abc123")
DeadLetterQueue
Runs that exhaust all retry attempts move to the Dead Letter Queue. From there you can inspect the failure, retry manually, or discard:
from sagewai import DeadLetterQueue
from sagewai.core.stores.postgres import PostgresStore
store = PostgresStore(database_url="postgresql://localhost/sagewai")
dlq = DeadLetterQueue(store=store)
# List failed runs
failed = await dlq.list_entries()
# Inspect a failed run
entry = await dlq.get_entry("run-abc123")
# Retry a failed run
await dlq.retry_entry("run-abc123")
# Discard a failed run
await dlq.discard_entry("run-abc123")
Complete Example
A durable workflow that survives crashes, requires human approval before publishing, and runs on distributed workers:
import asyncio
from sagewai import (
DurableWorkflow,
ApprovalGate,
UniversalAgent,
WorkflowWorker,
)
from sagewai.core.stores.postgres import PostgresStore
async def main():
store = PostgresStore(database_url="postgresql://localhost/sagewai")
await store.initialize()
researcher = UniversalAgent(name="researcher", model="gpt-4o")
writer = UniversalAgent(name="writer", model="gpt-4o")
wf = DurableWorkflow(name="article-pipeline", store=store)
gate = ApprovalGate(workflow=wf)
@wf.step("research")
async def research(topic: str) -> str:
return await researcher.chat(f"Research: {topic}")
@wf.step("draft", retries=3)
async def draft(findings: str) -> str:
return await writer.chat(f"Write article from: {findings}")
@wf.step("review")
async def review(content: str) -> str:
await gate.request_approval(prompt=f"Approve: {content[:200]}...")
return content
# Submit the workflow — a worker will pick it up
result = await wf.run(topic="quantum computing advances")
print(result)
asyncio.run(main())
What's Next
- Agents — Agent types and composition patterns
- Context Engine — Document ingestion and retrieval for workflow context
- Safety — Budget enforcement to prevent runaway costs in long workflows