Workflows
Durable workflow execution with checkpointing, distributed workers, dead letter queues, and load balancing. Build Temporal-like workflows with Python decorators.
from sagewai import DurableWorkflow
wf = DurableWorkflow(name="article-pipeline", store=postgres_store)
@wf.step("research")
async def research(topic: str) -> str:
return await researcher.chat(topic)
@wf.step("draft", retries=3)
async def draft(research_output: str) -> str:
return await writer.chat(research_output)
result = await wf.run(topic="quantum computing")
DurableWorkflow
Step-level checkpointing and automatic retry. Workflows survive process restarts and can be resumed from the last successful checkpoint.
from sagewai import DurableWorkflow
from sagewai.core.stores.postgres import PostgresStore
store = PostgresStore(database_url="postgresql://localhost/sagewai")
await store.initialize()
wf = DurableWorkflow(name="my-workflow", store=store)
@wf.step("step1")
async def step1(input: str) -> str:
return "processed: " + input
@wf.step("step2", retries=3)
async def step2(prev: str) -> str:
return "finalized: " + prev
result = await wf.run(input_data="hello")
Key Methods
| Method | Description |
|---|
@wf.step(name, retries=0) | Decorator to register a workflow step |
await wf.run(**kwargs) | Execute the workflow end-to-end |
await wf.resume(run_id) | Resume from last checkpoint |
await wf.signal(run_id, name, value) | Send a signal to a running workflow |
ApprovalGate
Pause a workflow until human approval is received via signal.
from sagewai import ApprovalGate, DurableWorkflow
wf = DurableWorkflow(name="review-pipeline", store=store)
@wf.step("draft")
async def draft(topic: str) -> str:
return await writer.chat(topic)
@wf.step("approve")
async def approve(draft: str) -> str:
gate = ApprovalGate(timeout_seconds=3600)
return await gate.wait(wf, "approval_signal")
# From another process:
await wf.signal(run_id, "approval_signal", {"approved": True})
WorkflowWorker
Distributed workflow execution consumer. Polls a PostgresStore for pending runs, claims them atomically via FOR UPDATE SKIP LOCKED, and executes with heartbeats. Multiple workers can run concurrently for horizontal scaling.
from sagewai import WorkflowWorker
worker = WorkflowWorker(
store=postgres_store,
workflow_registry={"article-pipeline": my_workflow},
pool="cloud-gpt4",
labels={"zone": "us-east", "gpu": True},
credentials=my_credentials,
)
await worker.start() # Blocks until shutdown
Constructor
| Parameter | Type | Default | Description |
|---|
store | PostgresStore | required | Database store with queue operations |
workflow_registry | dict[str, DurableWorkflow] | required | Maps workflow names to instances |
max_concurrent | int | 5 | Max concurrent workflow executions |
poll_interval | float | 1.0 | Seconds between poll cycles |
heartbeat_interval | float | 30.0 | Seconds between heartbeats |
shutdown_timeout | float | 30.0 | Max seconds to wait on shutdown |
project_id | str | None | None | Project scope (workers only claim matching runs) |
pool | str | None | None | Worker pool name (like a Temporal task queue) |
labels | dict[str, Any] | None | None | Key-value metadata for label-based routing |
credentials | WorkerCredentials | None | None | Per-worker LLM credentials (injected via ContextVar) |
Methods
| Method | Signature | Description |
|---|
start | async start() | Start the worker (blocks until shutdown) |
shutdown | async shutdown() | Gracefully shut down |
WorkflowMonitor
Temporal-like visibility into workflow executions. For CLI and admin panel integration.
from sagewai import WorkflowMonitor
monitor = WorkflowMonitor(store=postgres_store)
executions = await monitor.list_executions(status="running")
detail = await monitor.get_execution("run-abc123")
timeline = await monitor.get_execution_timeline("run-abc123")
await monitor.retry_execution("run-abc123")
Constructor
| Parameter | Type | Default | Description |
|---|
store | WorkflowStore | required | Workflow state store |
Methods
| Method | Returns | Description |
|---|
list_executions(status=None) | list[ExecutionSummary] | List workflow executions |
get_execution(run_id) | ExecutionDetail | Full execution detail |
get_execution_timeline(run_id) | list[TimelineEvent] | Event history |
retry_execution(run_id) | str | Retry a failed execution (returns new run_id) |
WorkerLoadBalancer
Assigns workflow runs to workers based on routing strategy. Queries the workers table to pick the best worker.
from sagewai import WorkerLoadBalancer
from sagewai.models.worker import RoutingConstraints, RoutingStrategy
balancer = WorkerLoadBalancer(store=postgres_store)
worker_id = await balancer.assign(
RoutingConstraints(
worker_pool="cloud-gpt4",
strategy=RoutingStrategy.ROUND_ROBIN,
)
)
Constructor
| Parameter | Type | Default | Description |
|---|
store | PostgresStore | required | Store with access to workers and workflow_runs tables |
Methods
| Method | Signature | Returns | Description |
|---|
assign | async assign(constraints=None, project_id=None) | str | None | Pick a target worker_id |
DeadLetterQueue
Failed workflows are moved to the DLQ for inspection, manual retry, or discard.
from sagewai import DeadLetterQueue
dlq = DeadLetterQueue(store=postgres_store)
await dlq.move_to_dlq("my-workflow", "run-123", "Step failed: timeout")
entries = await dlq.list_entries()
new_run_id = await dlq.retry("run-123")
await dlq.purge(older_than_days=30)
Constructor
| Parameter | Type | Default | Description |
|---|
store | Any | required | Workflow state store |
Methods
| Method | Returns | Description |
|---|
move_to_dlq(workflow_name, run_id, error) | None | Move a failed run to the DLQ |
list_entries(workflow_name=None) | list[DLQEntry] | List DLQ entries |
retry(run_id) | str | Retry from DLQ (returns new run_id) |
purge(older_than_days=30) | int | Purge old entries (returns count) |
WorkerCredentials
Credentials and model overrides injected by a worker at execution time. Set via ContextVar, read by UniversalAgent at LLM call time. Credentials never touch the database.
from sagewai import WorkerCredentials
from sagewai import InferenceParams
creds = WorkerCredentials(
model_overrides={"default": "ollama/llama3.2"},
inference_overrides=InferenceParams(
api_base="http://localhost:11434",
),
)
Fields
| Field | Type | Default | Description |
|---|
model_overrides | dict[str, str] | {} | Maps logical names to model strings. "default" replaces config.model |
inference_overrides | InferenceParams | None | None | LLM provider overrides (api_base, api_key, etc.) |
env_overrides | dict[str, str] | {} | Extra env vars (in-process only, never persisted) |
get_worker_credentials
Get the current worker's credentials from the ContextVar. Returns None when not running inside a WorkflowWorker.
from sagewai import get_worker_credentials
creds = get_worker_credentials()
if creds:
print(f"Running on worker with model: {creds.model_overrides}")
RoutingConstraints
Routing constraints attached to a workflow submission. Determines which workers can claim the run.
from sagewai.models.worker import RoutingConstraints, RoutingStrategy
constraints = RoutingConstraints(
worker_pool="local-ollama",
worker_labels={"gpu": True},
strategy=RoutingStrategy.LEAST_LOADED,
)
Fields
| Field | Type | Default | Description |
|---|
worker_pool | str | None | None | Target worker pool name |
worker_labels | dict[str, Any] | None | None | Required labels (JSONB containment match) |
worker_id | str | None | None | Target a specific worker by ID |
strategy | RoutingStrategy | LEAST_LOADED | Load-balancing strategy |
capacity_threshold | float | 0.9 | For THRESHOLD strategy: skip workers above this load ratio |
RoutingStrategy Enum
| Value | Description |
|---|
DIRECT | Workers self-select at claim time via pool/labels/id filters |
ROUND_ROBIN | Rotate across eligible workers |
LEAST_LOADED | Pick worker with lowest load ratio |
THRESHOLD | Like least_loaded, but skip workers above capacity threshold |