Workflows

Durable workflow execution with checkpointing, distributed workers, dead letter queues, and load balancing. Build Temporal-like workflows with Python decorators.

from sagewai import DurableWorkflow

wf = DurableWorkflow(name="article-pipeline", store=postgres_store)

@wf.step("research")
async def research(topic: str) -> str:
    return await researcher.chat(topic)

@wf.step("draft", retries=3)
async def draft(research_output: str) -> str:
    return await writer.chat(research_output)

result = await wf.run(topic="quantum computing")

DurableWorkflow

Step-level checkpointing and automatic retry. Workflows survive process restarts and can be resumed from the last successful checkpoint.

from sagewai import DurableWorkflow
from sagewai.core.stores.postgres import PostgresStore

store = PostgresStore(database_url="postgresql://localhost/sagewai")
await store.initialize()

wf = DurableWorkflow(name="my-workflow", store=store)

@wf.step("step1")
async def step1(input: str) -> str:
    return "processed: " + input

@wf.step("step2", retries=3)
async def step2(prev: str) -> str:
    return "finalized: " + prev

result = await wf.run(input_data="hello")

Key Methods

MethodDescription
@wf.step(name, retries=0)Decorator to register a workflow step
await wf.run(**kwargs)Execute the workflow end-to-end
await wf.resume(run_id)Resume from last checkpoint
await wf.signal(run_id, name, value)Send a signal to a running workflow

ApprovalGate

Pause a workflow until human approval is received via signal.

from sagewai import ApprovalGate, DurableWorkflow

wf = DurableWorkflow(name="review-pipeline", store=store)

@wf.step("draft")
async def draft(topic: str) -> str:
    return await writer.chat(topic)

@wf.step("approve")
async def approve(draft: str) -> str:
    gate = ApprovalGate(timeout_seconds=3600)
    return await gate.wait(wf, "approval_signal")

# From another process:
await wf.signal(run_id, "approval_signal", {"approved": True})

WorkflowWorker

Distributed workflow execution consumer. Polls a PostgresStore for pending runs, claims them atomically via FOR UPDATE SKIP LOCKED, and executes with heartbeats. Multiple workers can run concurrently for horizontal scaling.

from sagewai import WorkflowWorker

worker = WorkflowWorker(
    store=postgres_store,
    workflow_registry={"article-pipeline": my_workflow},
    pool="cloud-gpt4",
    labels={"zone": "us-east", "gpu": True},
    credentials=my_credentials,
)
await worker.start()  # Blocks until shutdown

Constructor

ParameterTypeDefaultDescription
storePostgresStorerequiredDatabase store with queue operations
workflow_registrydict[str, DurableWorkflow]requiredMaps workflow names to instances
max_concurrentint5Max concurrent workflow executions
poll_intervalfloat1.0Seconds between poll cycles
heartbeat_intervalfloat30.0Seconds between heartbeats
shutdown_timeoutfloat30.0Max seconds to wait on shutdown
project_idstr | NoneNoneProject scope (workers only claim matching runs)
poolstr | NoneNoneWorker pool name (like a Temporal task queue)
labelsdict[str, Any] | NoneNoneKey-value metadata for label-based routing
credentialsWorkerCredentials | NoneNonePer-worker LLM credentials (injected via ContextVar)

Methods

MethodSignatureDescription
startasync start()Start the worker (blocks until shutdown)
shutdownasync shutdown()Gracefully shut down

WorkflowMonitor

Temporal-like visibility into workflow executions. For CLI and admin panel integration.

from sagewai import WorkflowMonitor

monitor = WorkflowMonitor(store=postgres_store)

executions = await monitor.list_executions(status="running")
detail = await monitor.get_execution("run-abc123")
timeline = await monitor.get_execution_timeline("run-abc123")
await monitor.retry_execution("run-abc123")

Constructor

ParameterTypeDefaultDescription
storeWorkflowStorerequiredWorkflow state store

Methods

MethodReturnsDescription
list_executions(status=None)list[ExecutionSummary]List workflow executions
get_execution(run_id)ExecutionDetailFull execution detail
get_execution_timeline(run_id)list[TimelineEvent]Event history
retry_execution(run_id)strRetry a failed execution (returns new run_id)

WorkerLoadBalancer

Assigns workflow runs to workers based on routing strategy. Queries the workers table to pick the best worker.

from sagewai import WorkerLoadBalancer
from sagewai.models.worker import RoutingConstraints, RoutingStrategy

balancer = WorkerLoadBalancer(store=postgres_store)

worker_id = await balancer.assign(
    RoutingConstraints(
        worker_pool="cloud-gpt4",
        strategy=RoutingStrategy.ROUND_ROBIN,
    )
)

Constructor

ParameterTypeDefaultDescription
storePostgresStorerequiredStore with access to workers and workflow_runs tables

Methods

MethodSignatureReturnsDescription
assignasync assign(constraints=None, project_id=None)str | NonePick a target worker_id

DeadLetterQueue

Failed workflows are moved to the DLQ for inspection, manual retry, or discard.

from sagewai import DeadLetterQueue

dlq = DeadLetterQueue(store=postgres_store)

await dlq.move_to_dlq("my-workflow", "run-123", "Step failed: timeout")
entries = await dlq.list_entries()
new_run_id = await dlq.retry("run-123")
await dlq.purge(older_than_days=30)

Constructor

ParameterTypeDefaultDescription
storeAnyrequiredWorkflow state store

Methods

MethodReturnsDescription
move_to_dlq(workflow_name, run_id, error)NoneMove a failed run to the DLQ
list_entries(workflow_name=None)list[DLQEntry]List DLQ entries
retry(run_id)strRetry from DLQ (returns new run_id)
purge(older_than_days=30)intPurge old entries (returns count)

WorkerCredentials

Credentials and model overrides injected by a worker at execution time. Set via ContextVar, read by UniversalAgent at LLM call time. Credentials never touch the database.

from sagewai import WorkerCredentials
from sagewai import InferenceParams

creds = WorkerCredentials(
    model_overrides={"default": "ollama/llama3.2"},
    inference_overrides=InferenceParams(
        api_base="http://localhost:11434",
    ),
)

Fields

FieldTypeDefaultDescription
model_overridesdict[str, str]{}Maps logical names to model strings. "default" replaces config.model
inference_overridesInferenceParams | NoneNoneLLM provider overrides (api_base, api_key, etc.)
env_overridesdict[str, str]{}Extra env vars (in-process only, never persisted)

get_worker_credentials

Get the current worker's credentials from the ContextVar. Returns None when not running inside a WorkflowWorker.

from sagewai import get_worker_credentials

creds = get_worker_credentials()
if creds:
    print(f"Running on worker with model: {creds.model_overrides}")

RoutingConstraints

Routing constraints attached to a workflow submission. Determines which workers can claim the run.

from sagewai.models.worker import RoutingConstraints, RoutingStrategy

constraints = RoutingConstraints(
    worker_pool="local-ollama",
    worker_labels={"gpu": True},
    strategy=RoutingStrategy.LEAST_LOADED,
)

Fields

FieldTypeDefaultDescription
worker_poolstr | NoneNoneTarget worker pool name
worker_labelsdict[str, Any] | NoneNoneRequired labels (JSONB containment match)
worker_idstr | NoneNoneTarget a specific worker by ID
strategyRoutingStrategyLEAST_LOADEDLoad-balancing strategy
capacity_thresholdfloat0.9For THRESHOLD strategy: skip workers above this load ratio

RoutingStrategy Enum

ValueDescription
DIRECTWorkers self-select at claim time via pool/labels/id filters
ROUND_ROBINRotate across eligible workers
LEAST_LOADEDPick worker with lowest load ratio
THRESHOLDLike least_loaded, but skip workers above capacity threshold