Fleet Architecture
A deep-dive into how the Sagewai Enterprise Fleet system is designed — enrollment flow, task dispatch, heartbeat management, and security layers.
System Overview
┌─────────────────────────────────────────────────────────────────┐
│ Sagewai Cloud │
│ │
│ ┌──────────────┐ ┌─────────────────┐ ┌───────────────┐ │
│ │ Admin Panel │ │ Fleet REST API │ │ Task Queue │ │
│ │ (approval, │◄──►│ /api/v1/fleet/ │◄──►│ (Postgres │ │
│ │ monitoring) │ │ │ │ SKIP LOCKED) │ │
│ └──────────────┘ └────────┬────────┘ └───────┬───────┘ │
│ │ │ │
│ ┌──────────▼──────────┐ │ │
│ │ FleetRegistry │ │ │
│ │ (worker state, │ │ │
│ │ enrollment keys, │ │ │
│ │ approval FSM) │ │ │
│ └──────────┬──────────┘ │ │
│ │ │ │
│ ┌──────────▼──────────┐ ┌────────▼───────┐ │
│ │ WRTTokenManager │ │ FleetDispatcher│ │
│ │ (JWT issuance, │ │ (long-poll, │ │
│ │ JTI revocation) │ │ encryption, │ │
│ └─────────────────────┘ │ audit) │ │
│ └────────┬───────┘ │
└──────────────────────────────────────────────────────┼─────────┘
│ HTTPS / WRT
─────────────────────────┤
│ │
┌─────────────▼──────┐ ┌─────────────▼──────┐
│ Worker A │ │ Worker B │
│ pool: gpu-prod │ │ pool: default │
│ models: llama3 │ │ models: gpt-4o │
│ labels: gpu=true │ │ labels: env=prod │
│ (your data center)│ │ (your K8s cluster)│
└─────────────────────┘ └─────────────────────┘
Component Reference
| Component | Location | Responsibility |
|---|---|---|
FleetRegistry | sagewai/fleet/registry.py | Worker registration, approval state machine, enrollment key CRUD (SHA-256 hashed keys) |
WRTTokenManager | sagewai/fleet/tokens.py | Issuing and verifying Worker Registration Tokens (JWT, wrt-1. prefix, JTI revocation list) |
FleetDispatcher | sagewai/fleet/dispatcher.py | Long-poll task claiming, Fernet payload encryption, dispatch audit events |
FleetAuditEvent | sagewai/fleet/audit.py | Append-only audit trail with 13 event types (registered, approved, revoked, task_claimed, etc.) |
FleetAnomalyDetector | sagewai/fleet/anomaly.py | Detects rate anomalies, excessive failures, heartbeat timeouts, model mismatches; auto-revokes |
LLMHealthProbe | sagewai/fleet/probe.py | Probes Ollama and OpenAI-compatible endpoints to verify model availability |
MTLSVerifier | sagewai/fleet/mtls.py | Mutual TLS verification for Enterprise tier (planned) |
ModelNormalizer | sagewai/fleet/models.py | Normalizes model name variants (e.g. gpt4o, gpt-4o, openai/gpt-4o → canonical form) |
Enrollment Flow
Worker starts
│
▼
Read ENROLLMENT_KEY env var (wrt-1.eyJ...)
│
▼
POST /api/v1/fleet/register
body: { pool, labels, models, capabilities }
│
▼
FleetRegistry.register_worker()
→ store worker record with status=PENDING
→ emit WORKER_REGISTERED audit event
│
▼
Admin sees worker in panel (status: PENDING)
│
▼
Admin approves → PATCH /api/v1/fleet/workers/{id}/approve
→ FleetRegistry transitions: PENDING → APPROVED
→ WRTTokenManager issues short-lived task token
→ emit WORKER_APPROVED audit event
│
▼
Worker polls for tasks (approved status required)
Approval states: PENDING → APPROVED → REVOKED (one-way; revoked workers cannot re-register with the same key).
Task Dispatch Flow
Workflow step ready for execution
│
▼
FleetDispatcher.claim_task()
SELECT ... FOR UPDATE SKIP LOCKED
WHERE pool = $target_pool
AND labels @> $target_labels (JSONB containment)
AND models_canonical @> $model (GIN index on TEXT[])
│
▼
Task payload encrypted with org's Fernet key
→ only workers in that org can decrypt
│
▼
Worker receives encrypted task over HTTPS long-poll
→ decrypts payload
→ injects worker credentials into ContextVar
→ UniversalAgent reads credentials, overrides model/api_key
│
▼
Worker executes task, reports result
→ POST /api/v1/fleet/tasks/{id}/complete
→ emit TASK_COMPLETED audit event
│
▼
WorkflowMonitor marks step complete, continues workflow
Heartbeat Flow
Workers send a heartbeat every 30 seconds to signal liveness:
Worker heartbeat loop
│
▼
POST /api/v1/fleet/workers/{id}/heartbeat
body: { timestamp, tasks_completed, tasks_failed }
│
▼
FleetRegistry updates last_heartbeat_at
│
▼
FleetAnomalyDetector checks (runs every 60s):
- missed_heartbeats > 3 → HEARTBEAT_TIMEOUT anomaly
- failure_rate > threshold → EXCESSIVE_FAILURES anomaly
- model mismatch → MODEL_MISMATCH anomaly
- request_rate spike → RATE_ANOMALY anomaly
│
▼
If 2+ anomaly types → auto-revoke worker
→ emit WORKER_REVOKED audit event
→ admin alerted via notification channel
Security Layers
Layer 1: Enrollment Token (WRT)
Each enrollment key is single-use (or time-limited). The stored key is SHA-256 hashed; the plaintext is only shown once at creation time.
# Key structure (JWT claims)
{
"sub": "worker-<uuid>",
"iss": "sagewai-fleet",
"jti": "<unique token id>", # added to revocation list on revoke
"scopes": ["worker:register", "worker:heartbeat", "worker:claim"],
"pool": "gpu-prod",
"org_id": "acme",
"exp": 1750000000
}
Layer 2: Payload Encryption
Task payloads (prompts, tool inputs, agent configs) are encrypted with a per-org Fernet key before leaving the gateway. Workers in other organizations cannot decrypt tasks even if they intercept the traffic.
Layer 3: Approval Gate
No worker receives tasks until an admin explicitly approves it. The approval is logged in the audit trail with the approver's identity and timestamp.
Layer 4: Anomaly Detection
The FleetAnomalyDetector runs continuously. Any worker exhibiting suspicious patterns (sudden rate spike, repeated failures, claiming tasks for models it doesn't have) is automatically revoked pending investigation.
Layer 5: mTLS (Enterprise, planned)
Workers will present client certificates issued by the org's CA. The gateway verifies both the WRT token and the client certificate. Revoked certificates are propagated via OCSP.
Database Schema
Three tables support the fleet system (migration 005_fleet.py):
-- Worker registry
workers (
id, org_id, pool, labels JSONB, models TEXT[], models_canonical TEXT[],
status, enrollment_key_id, last_heartbeat_at, capabilities JSONB,
target_pool, target_labels JSONB, target_worker_id -- routing columns (migration 004)
)
-- Enrollment keys
enrollment_keys (
id, org_id, key_hash TEXT, pool, labels JSONB, expires_at,
created_by, used_at, revoked_at
)
-- Audit trail
fleet_audit_events (
id, org_id, worker_id, event_type, actor_id, metadata JSONB, created_at
)
The models_canonical column uses a GIN index for efficient @> containment queries during task routing.