Workflows

Workflows compose multiple agents into deterministic execution patterns. Unlike a single agent that decides what to do via LLM, workflow agents follow a fixed structure — sequential, parallel, conditional, or iterative — while each sub-agent within the workflow uses its own LLM.

For durable execution that survives process crashes, Sagewai provides DurableWorkflow with PostgreSQL-backed checkpointing, human-in-the-loop approval gates, and distributed workers.

Workflow Agents

Four built-in workflow agents handle the most common composition patterns. See the Agents page for full details on each.

PatternDescription
SequentialAgentRuns sub-agents in order, piping output to input
ParallelAgentRuns sub-agents concurrently, merges outputs
ConditionalAgentRoutes input to a branch based on a condition
LoopAgentRepeats a sub-agent until a stop condition
from sagewai import UniversalAgent, SequentialAgent, ParallelAgent

researcher = UniversalAgent(name="researcher", model="gpt-4o")
writer = UniversalAgent(name="writer", model="claude-sonnet-4-20250514")
fact_checker = UniversalAgent(name="fact-checker", model="gpt-4o-mini")
editor = UniversalAgent(name="editor", model="gpt-4o-mini")

# Stage 1: Research
# Stage 2: Write
# Stage 3: Parallel fact-check and edit
review = ParallelAgent(name="review", agents=[fact_checker, editor])
pipeline = SequentialAgent(name="pipeline", agents=[researcher, writer, review])

result = await pipeline.chat("Write about quantum computing advances in 2025")

DurableWorkflow

DurableWorkflow provides step-level checkpointing and automatic recovery. If a process crashes mid-workflow, it resumes from the last completed step on restart.

from sagewai import DurableWorkflow, UniversalAgent
from sagewai.core.stores.postgres import PostgresStore

store = PostgresStore(database_url="postgresql://localhost/sagewai")
await store.initialize()

researcher = UniversalAgent(name="researcher", model="gpt-4o")
writer = UniversalAgent(name="writer", model="gpt-4o")

wf = DurableWorkflow(name="article-pipeline", store=store)

@wf.step("research")
async def research(topic: str) -> str:
    return await researcher.chat(topic)

@wf.step("draft", retries=3, retry_delay=2.0)
async def draft(research_output: str) -> str:
    return await writer.chat(f"Write an article based on: {research_output}")

@wf.step("publish", timeout=30.0)
async def publish(draft: str) -> str:
    # Publish to CMS, send notifications, etc.
    return f"Published: {draft[:100]}..."

result = await wf.run(topic="quantum computing")

Step Options

ParameterTypeDefaultDescription
namestrrequiredStep name (must be unique in the workflow)
retriesint0Number of retry attempts on failure
retry_delayfloat1.0Seconds between retries
timeoutfloat | NoneNoneStep timeout in seconds

Resuming After Crash

If the process crashes after the "research" step completes but before "draft" finishes, pass the same run_id to resume:

# On restart — skips "research", resumes from "draft"
result = await wf.run(run_id="article-pipeline-abc123")

The workflow reads checkpointed state from PostgreSQL and skips already-completed steps.

Stores

StoreUse Case
InMemoryStoreTesting and development
PostgresStoreProduction (survives process restarts)

ApprovalGate

ApprovalGate pauses a workflow and waits for human approval before continuing. This enables human-in-the-loop patterns for sensitive operations.

from sagewai import DurableWorkflow, ApprovalGate, UniversalAgent
from sagewai.core.stores.postgres import PostgresStore

store = PostgresStore(database_url="postgresql://localhost/sagewai")
await store.initialize()

writer = UniversalAgent(name="writer", model="gpt-4o")
wf = DurableWorkflow(name="content-pipeline", store=store)
gate = ApprovalGate(workflow=wf)

@wf.step("draft")
async def draft(topic: str) -> str:
    return await writer.chat(f"Write about: {topic}")

@wf.step("review")
async def review(content: str) -> str:
    # This pauses the workflow until a human approves
    await gate.request_approval(
        prompt=f"Approve publication of: {content[:200]}...",
        context={"content": content},
    )
    return content  # Only reached after approval

@wf.step("publish")
async def publish(approved_content: str) -> str:
    return f"Published: {approved_content[:100]}..."

# Start the workflow — it will pause at the "review" step
run_id = "content-run-001"
try:
    result = await wf.run(run_id=run_id, topic="AI safety")
except Exception:
    pass  # Workflow is now waiting for approval

Approving or Rejecting

From an admin endpoint, CLI, or any external process:

# Approve — the workflow resumes from the review step
await gate.approve(run_id="content-run-001", reviewer="editor@acme.com")

# Or reject
await gate.reject(
    run_id="content-run-001",
    reason="Content needs more citations",
    reviewer="editor@acme.com",
)

After approval, resume the workflow:

result = await wf.run(run_id="content-run-001")
# Picks up from "review" step, then runs "publish"

WorkflowWorker

WorkflowWorker is a distributed execution consumer that polls for pending workflow runs, claims them atomically, and executes them with heartbeat emission. Multiple workers can run concurrently against the same database for horizontal scaling.

from sagewai import WorkflowWorker
from sagewai.core.stores.postgres import PostgresStore

store = PostgresStore(database_url="postgresql://localhost/sagewai")
await store.initialize()

worker = WorkflowWorker(
    store=store,
    workflow_registry={"article-pipeline": my_workflow},
    poll_interval=2.0,
    heartbeat_interval=10.0,
)

await worker.start()  # Blocks until shutdown

Pool and Label Routing

Workers support pool and label-based routing, similar to Temporal task queues. This lets you route specific workflows to workers with the right resources:

from sagewai import WorkflowWorker, WorkerLoadBalancer
from sagewai.models.worker import WorkerCredentials
from sagewai.models.inference import InferenceParams

# Worker with local Ollama on GPU
worker = WorkflowWorker(
    store=store,
    workflow_registry={"article-pipeline": my_workflow},
    pool="local-ollama",
    labels={"zone": "local", "gpu": True},
    credentials=WorkerCredentials(
        model_overrides={"default": "ollama/llama3.2"},
        inference_overrides=InferenceParams(
            api_base="http://localhost:11434",
        ),
    ),
)

When a workflow run specifies routing constraints, the worker claim query uses PostgreSQL's FOR UPDATE SKIP LOCKED pattern with JSONB label containment for contention-free work distribution.

Worker Credentials

Per-worker credentials are injected into agents via ContextVar, so agents automatically use the worker's model, API base, and API key at LLM call time:

from sagewai import get_worker_credentials

# Inside an agent's LLM call (automatic — no user code needed)
creds = get_worker_credentials()
# Returns WorkerCredentials with model_overrides, inference_overrides

WorkflowMonitor

WorkflowMonitor provides Temporal-like visibility into workflow executions:

from sagewai import WorkflowMonitor
from sagewai.core.stores.postgres import PostgresStore

store = PostgresStore(database_url="postgresql://localhost/sagewai")
monitor = WorkflowMonitor(store=store)

# List running workflows
executions = await monitor.list_executions(status="running")

# Inspect a specific execution
detail = await monitor.get_execution("run-abc123")

# Get execution timeline (like Temporal's event history)
timeline = await monitor.get_execution_timeline("run-abc123")

# Retry a failed execution
await monitor.retry_execution("run-abc123")

DeadLetterQueue

Failed workflow runs that exhaust their retry attempts are moved to the Dead Letter Queue for manual inspection:

from sagewai import DeadLetterQueue
from sagewai.core.stores.postgres import PostgresStore

store = PostgresStore(database_url="postgresql://localhost/sagewai")
dlq = DeadLetterQueue(store=store)

# List failed runs
failed = await dlq.list_entries()

# Inspect a failed run
entry = await dlq.get_entry("run-abc123")

# Retry a failed run
await dlq.retry_entry("run-abc123")

# Discard a failed run
await dlq.discard_entry("run-abc123")

Complete Example

Here is a durable workflow that survives crashes, requires human approval, and runs on distributed workers:

import asyncio
from sagewai import (
    DurableWorkflow,
    ApprovalGate,
    UniversalAgent,
    WorkflowWorker,
)
from sagewai.core.stores.postgres import PostgresStore

async def main():
    store = PostgresStore(database_url="postgresql://localhost/sagewai")
    await store.initialize()

    researcher = UniversalAgent(name="researcher", model="gpt-4o")
    writer = UniversalAgent(name="writer", model="gpt-4o")

    wf = DurableWorkflow(name="article-pipeline", store=store)
    gate = ApprovalGate(workflow=wf)

    @wf.step("research")
    async def research(topic: str) -> str:
        return await researcher.chat(f"Research: {topic}")

    @wf.step("draft", retries=3)
    async def draft(findings: str) -> str:
        return await writer.chat(f"Write article from: {findings}")

    @wf.step("review")
    async def review(content: str) -> str:
        await gate.request_approval(prompt=f"Approve: {content[:200]}...")
        return content

    # Submit the workflow — a worker will pick it up
    result = await wf.run(topic="quantum computing advances")
    print(result)

asyncio.run(main())

What's Next

  • Agents — Agent types and composition patterns
  • Context Engine — Document ingestion and retrieval for workflow context
  • Safety — Budget enforcement to prevent runaway costs in long workflows