Gateway & Streaming
The Sagewai Gateway exposes your agents through an OpenAI-compatible HTTP interface, handles incoming webhooks, and streams responses via Server-Sent Events (SSE) using the AG-UI protocol.
OpenAI-Compatible Endpoint
Your agents are accessible at a drop-in replacement for the OpenAI Chat Completions API:
POST /v1/chat/completions
This means any client that works with OpenAI (LangChain, LlamaIndex, custom scripts) can route to Sagewai with a one-line URL change.
cURL example
curl -X POST https://your-gateway.sagewai.ai/v1/chat/completions \
-H "Authorization: Bearer $SAGEWAI_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"model": "my-research-agent",
"messages": [
{"role": "user", "content": "Summarize recent AI safety news"}
],
"stream": false
}'
Python client (OpenAI SDK)
from openai import AsyncOpenAI
client = AsyncOpenAI(
api_key="your-sagewai-api-key",
base_url="https://your-gateway.sagewai.ai/v1",
)
response = await client.chat.completions.create(
model="my-research-agent", # maps to a registered Sagewai agent
messages=[{"role": "user", "content": "What are the key AI papers from this week?"}],
)
print(response.choices[0].message.content)
Streaming with AG-UI (SSE)
For real-time streaming, Sagewai uses the AG-UI protocol — an SSE-based streaming format that emits structured events as the agent works.
Enable streaming via the OpenAI-compat endpoint
async for chunk in await client.chat.completions.create(
model="my-agent",
messages=[{"role": "user", "content": "Write a 500-word summary"}],
stream=True,
):
delta = chunk.choices[0].delta.content or ""
print(delta, end="", flush=True)
AG-UI native SSE endpoint
GET /agui/ws
POST /agui/runs
The AG-UI event stream emits typed events:
| Event Type | Payload | Meaning |
|---|---|---|
run_started | { run_id, agent_name } | Agent started |
text_delta | { delta: "..." } | Partial text output |
tool_call | { name, arguments } | Agent calling a tool |
tool_result | { name, result } | Tool returned a value |
run_completed | { run_id, output } | Agent finished |
run_error | { error } | Agent encountered an error |
Consuming AG-UI events in JavaScript
// Browser EventSource does not support custom headers.
// Use @microsoft/fetch-event-source or pass token as query param:
import { fetchEventSource } from "@microsoft/fetch-event-source";
await fetchEventSource("/agui/ws?run_id=run-001", {
headers: { Authorization: `Bearer ${apiKey}` },
onmessage(ev) {
const data = JSON.parse(ev.data);
if (ev.event === "text_delta") {
document.getElementById("output").textContent += data.delta;
}
if (ev.event === "run_completed") {
// Stream complete
}
},
});
Webhook Triggers
Register webhooks so external services (Stripe, GitHub, Slack) can trigger agent runs:
from sagewai.gateway import WebhookRouter
router = WebhookRouter()
@router.on("stripe.payment_intent.succeeded")
async def handle_payment(payload: dict) -> None:
# payload is the verified Stripe webhook body
agent = UniversalAgent(name="billing-agent", model="gpt-4o")
await agent.chat(f"Process payment: {payload['id']}")
# In your FastAPI app
app.include_router(router.as_fastapi_router(), prefix="/webhooks")
Webhooks are verified via HMAC (Stripe/Slack/GitHub signing secrets). Unverified requests are rejected before reaching the handler.
from sagewai.gateway import WebhookRouter
# Register with HMAC verification
router = WebhookRouter(
secret="whsec_...", # your webhook signing secret
signature_header="stripe-signature",
verification_mode="stripe", # or "slack", "github", "hmac-sha256"
)
Pollers
Schedule recurring checks on external services:
from sagewai.gateway import PollerManager
poller = PollerManager()
@poller.every(minutes=15)
async def check_mentions() -> None:
"""Check for new brand mentions and draft responses."""
mentions = await fetch_mentions()
if mentions:
agent = UniversalAgent(name="social-agent", model="gpt-4o")
for mention in mentions:
await agent.chat(f"Draft a reply to: {mention['text']}")
await poller.start()
Event Listeners
Subscribe to real-time streams (Kafka, Redis Pub/Sub, SSE from external services):
from sagewai.gateway import ListenerManager
listener = ListenerManager()
@listener.on_sse("https://api.example.com/events")
async def handle_event(event: dict) -> None:
if event["type"] == "order.created":
await fulfillment_agent.chat(f"Process order {event['order_id']}")
await listener.start()
Agent-to-Agent Protocol (A2A)
Agents expose agent cards — structured capability declarations that other agents can discover and call:
from sagewai.protocols.a2a import AgentCard, A2AClient
# Declare an agent card
card = AgentCard(
name="research-agent",
description="Searches and summarizes information",
capabilities=["web_search", "summarization"],
endpoint="https://your-gateway.sagewai.ai/v1/chat/completions",
model="gpt-4o",
)
# Another agent discovers and calls it
client = A2AClient(gateway_url="https://your-gateway.sagewai.ai")
result = await client.delegate(
target_agent="research-agent",
task="Summarize the latest LLM benchmarks",
)
The A2A protocol allows agents from different organizations to collaborate without sharing credentials — only the agent card endpoint is exposed.
Configuring the Gateway
In your admin backend's config:
# apps/admin/admin/config.py
GATEWAY_ENABLED = True
GATEWAY_OPENAI_COMPAT = True # Enable /v1/chat/completions
GATEWAY_AGUI = True # Enable /agui/ endpoints
GATEWAY_WEBHOOKS = True # Enable webhook router
GATEWAY_A2A = True # Enable A2A agent cards
Or via environment variables:
SAGEWAI_GATEWAY_ENABLED=true
SAGEWAI_GATEWAY_OPENAI_COMPAT=true