Workflows

Workflows compose multiple agents into a deterministic execution structure. Each sub-agent uses its own LLM; the workflow controls the order and state hand-off. For tasks that must survive process crashes, Sagewai adds DurableWorkflow with PostgreSQL-backed step checkpointing, human-in-the-loop approval gates, and distributed workers.

Prerequisites: Agents · Next: Context Engine · Safety

Workflow Agents

Four built-in workflow agents cover the most common composition patterns. See the Agents page for full constructor details.

PatternDescription
SequentialAgentRuns sub-agents in order, piping output to input
ParallelAgentRuns sub-agents concurrently, merges outputs
ConditionalAgentRoutes input to a branch based on a condition
LoopAgentRepeats a sub-agent until a stop condition
from sagewai import UniversalAgent, SequentialAgent, ParallelAgent

researcher = UniversalAgent(name="researcher", model="gpt-4o")
writer = UniversalAgent(name="writer", model="claude-sonnet-4-20250514")
fact_checker = UniversalAgent(name="fact-checker", model="gpt-4o-mini")
editor = UniversalAgent(name="editor", model="gpt-4o-mini")

# Stage 1: Research
# Stage 2: Write
# Stage 3: Parallel fact-check and edit
review = ParallelAgent(name="review", agents=[fact_checker, editor])
pipeline = SequentialAgent(name="pipeline", agents=[researcher, writer, review])

result = await pipeline.chat("Write about quantum computing advances in 2025")

DurableWorkflow

DurableWorkflow checkpoints each step to PostgreSQL as it completes. If the process crashes mid-run, pass the same run_id on restart and the workflow picks up from where it left off — skipping completed steps automatically.

from sagewai import DurableWorkflow, UniversalAgent
from sagewai.core.stores.postgres import PostgresStore

store = PostgresStore(database_url="postgresql://localhost/sagewai")
await store.initialize()

researcher = UniversalAgent(name="researcher", model="gpt-4o")
writer = UniversalAgent(name="writer", model="gpt-4o")

wf = DurableWorkflow(name="article-pipeline", store=store)

@wf.step("research")
async def research(topic: str) -> str:
    return await researcher.chat(topic)

@wf.step("draft", retries=3, retry_delay=2.0)
async def draft(research_output: str) -> str:
    return await writer.chat(f"Write an article based on: {research_output}")

@wf.step("publish", timeout=30.0)
async def publish(draft: str) -> str:
    # Publish to CMS, send notifications, etc.
    return f"Published: {draft[:100]}..."

result = await wf.run(topic="quantum computing")

Step Options

ParameterTypeDefaultDescription
namestrrequiredStep name (must be unique in the workflow)
retriesint0Number of retry attempts on failure
retry_delayfloat1.0Seconds between retries
timeoutfloat | NoneNoneStep timeout in seconds

Resuming After Crash

If the process crashes after "research" completes but before "draft" finishes, pass the same run_id to resume:

# Skips "research", resumes from "draft"
result = await wf.run(run_id="article-pipeline-abc123")

The workflow reads checkpointed state from PostgreSQL and skips steps that already completed.

Stores

StoreUse Case
InMemoryStoreTesting and development
PostgresStoreProduction (survives process restarts)

ApprovalGate

ApprovalGate pauses a workflow at a named step and waits for explicit human approval before continuing. Use it when a step involves an irreversible action — publishing, sending, deploying — and you need a human sign-off in the loop.

from sagewai import DurableWorkflow, ApprovalGate, UniversalAgent
from sagewai.core.stores.postgres import PostgresStore

store = PostgresStore(database_url="postgresql://localhost/sagewai")
await store.initialize()

writer = UniversalAgent(name="writer", model="gpt-4o")
wf = DurableWorkflow(name="content-pipeline", store=store)
gate = ApprovalGate(workflow=wf)

@wf.step("draft")
async def draft(topic: str) -> str:
    return await writer.chat(f"Write about: {topic}")

@wf.step("review")
async def review(content: str) -> str:
    # Pauses the workflow here until a human approves
    await gate.request_approval(
        prompt=f"Approve publication of: {content[:200]}...",
        context={"content": content},
    )
    return content  # Only reached after approval

@wf.step("publish")
async def publish(approved_content: str) -> str:
    return f"Published: {approved_content[:100]}..."

# Start the workflow — it will pause at the "review" step
run_id = "content-run-001"
try:
    result = await wf.run(run_id=run_id, topic="AI safety")
except Exception:
    pass  # Workflow is now waiting for approval

Approving or Rejecting

From an admin endpoint, CLI, or any other process:

# Approve — the workflow resumes from the review step
await gate.approve(run_id="content-run-001", reviewer="editor@acme.com")

# Or reject
await gate.reject(
    run_id="content-run-001",
    reason="Content needs more citations",
    reviewer="editor@acme.com",
)

After approval, resume with the same run_id:

result = await wf.run(run_id="content-run-001")
# Picks up from "review", then runs "publish"

WorkflowWorker

WorkflowWorker is a distributed execution consumer. It polls the database for pending runs, claims them atomically with FOR UPDATE SKIP LOCKED, executes them, and emits heartbeats. Run multiple workers against the same database for horizontal scaling.

from sagewai import WorkflowWorker
from sagewai.core.stores.postgres import PostgresStore

store = PostgresStore(database_url="postgresql://localhost/sagewai")
await store.initialize()

worker = WorkflowWorker(
    store=store,
    workflow_registry={"article-pipeline": my_workflow},
    poll_interval=2.0,
    heartbeat_interval=10.0,
)

await worker.start()  # Blocks until shutdown

Pool and Label Routing

Workers support pool and label-based routing to direct specific workflows to workers that have the right resources — local GPU, specific cloud region, etc.:

from sagewai import WorkflowWorker, WorkerLoadBalancer
from sagewai.models.worker import WorkerCredentials
from sagewai.models.inference import InferenceParams

# Worker with local Ollama on GPU
worker = WorkflowWorker(
    store=store,
    workflow_registry={"article-pipeline": my_workflow},
    pool="local-ollama",
    labels={"zone": "local", "gpu": True},
    credentials=WorkerCredentials(
        model_overrides={"default": "ollama/llama3.2"},
        inference_overrides=InferenceParams(
            api_base="http://localhost:11434",
        ),
    ),
)

The claim query uses JSONB label containment so routing decisions happen in the database, not in application code.

Worker Credentials

Per-worker credentials are injected into agents via ContextVar, so an agent automatically uses the worker's model and API configuration at LLM call time without any changes to your agent code:

from sagewai import get_worker_credentials

# Inside an agent's LLM call (automatic — no user code needed)
creds = get_worker_credentials()
# Returns WorkerCredentials with model_overrides, inference_overrides

WorkflowMonitor

WorkflowMonitor provides visibility into workflow executions — running, completed, and failed:

from sagewai import WorkflowMonitor
from sagewai.core.stores.postgres import PostgresStore

store = PostgresStore(database_url="postgresql://localhost/sagewai")
monitor = WorkflowMonitor(store=store)

# List running workflows
executions = await monitor.list_executions(status="running")

# Inspect a specific execution
detail = await monitor.get_execution("run-abc123")

# Get execution timeline (step-by-step event history)
timeline = await monitor.get_execution_timeline("run-abc123")

# Retry a failed execution
await monitor.retry_execution("run-abc123")

DeadLetterQueue

Runs that exhaust all retry attempts move to the Dead Letter Queue. From there you can inspect the failure, retry manually, or discard:

from sagewai import DeadLetterQueue
from sagewai.core.stores.postgres import PostgresStore

store = PostgresStore(database_url="postgresql://localhost/sagewai")
dlq = DeadLetterQueue(store=store)

# List failed runs
failed = await dlq.list_entries()

# Inspect a failed run
entry = await dlq.get_entry("run-abc123")

# Retry a failed run
await dlq.retry_entry("run-abc123")

# Discard a failed run
await dlq.discard_entry("run-abc123")

Complete Example

A durable workflow that survives crashes, requires human approval before publishing, and runs on distributed workers:

import asyncio
from sagewai import (
    DurableWorkflow,
    ApprovalGate,
    UniversalAgent,
    WorkflowWorker,
)
from sagewai.core.stores.postgres import PostgresStore

async def main():
    store = PostgresStore(database_url="postgresql://localhost/sagewai")
    await store.initialize()

    researcher = UniversalAgent(name="researcher", model="gpt-4o")
    writer = UniversalAgent(name="writer", model="gpt-4o")

    wf = DurableWorkflow(name="article-pipeline", store=store)
    gate = ApprovalGate(workflow=wf)

    @wf.step("research")
    async def research(topic: str) -> str:
        return await researcher.chat(f"Research: {topic}")

    @wf.step("draft", retries=3)
    async def draft(findings: str) -> str:
        return await writer.chat(f"Write article from: {findings}")

    @wf.step("review")
    async def review(content: str) -> str:
        await gate.request_approval(prompt=f"Approve: {content[:200]}...")
        return content

    # Submit the workflow — a worker will pick it up
    result = await wf.run(topic="quantum computing advances")
    print(result)

asyncio.run(main())

What's Next

  • Agents — Agent types and composition patterns
  • Context Engine — Document ingestion and retrieval for workflow context
  • Safety — Budget enforcement to prevent runaway costs in long workflows