Gateway & Streaming

The Sagewai Gateway exposes your agents through an OpenAI-compatible HTTP interface, handles incoming webhooks, and streams responses via Server-Sent Events (SSE) using the AG-UI protocol.


OpenAI-Compatible Endpoint

Your agents are accessible at a drop-in replacement for the OpenAI Chat Completions API:

POST /v1/chat/completions

This means any client that works with OpenAI (LangChain, LlamaIndex, custom scripts) can route to Sagewai with a one-line URL change.

cURL example

curl -X POST https://your-gateway.sagewai.ai/v1/chat/completions \
  -H "Authorization: Bearer $SAGEWAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "my-research-agent",
    "messages": [
      {"role": "user", "content": "Summarize recent AI safety news"}
    ],
    "stream": false
  }'

Python client (OpenAI SDK)

from openai import AsyncOpenAI

client = AsyncOpenAI(
    api_key="your-sagewai-api-key",
    base_url="https://your-gateway.sagewai.ai/v1",
)

response = await client.chat.completions.create(
    model="my-research-agent",  # maps to a registered Sagewai agent
    messages=[{"role": "user", "content": "What are the key AI papers from this week?"}],
)
print(response.choices[0].message.content)

Streaming with AG-UI (SSE)

For real-time streaming, Sagewai uses the AG-UI protocol — an SSE-based streaming format that emits structured events as the agent works.

Enable streaming via the OpenAI-compat endpoint

async for chunk in await client.chat.completions.create(
    model="my-agent",
    messages=[{"role": "user", "content": "Write a 500-word summary"}],
    stream=True,
):
    delta = chunk.choices[0].delta.content or ""
    print(delta, end="", flush=True)

AG-UI native SSE endpoint

GET /agui/ws
POST /agui/runs

The AG-UI event stream emits typed events:

Event TypePayloadMeaning
run_started{ run_id, agent_name }Agent started
text_delta{ delta: "..." }Partial text output
tool_call{ name, arguments }Agent calling a tool
tool_result{ name, result }Tool returned a value
run_completed{ run_id, output }Agent finished
run_error{ error }Agent encountered an error

Consuming AG-UI events in JavaScript

// Browser EventSource does not support custom headers.
// Use @microsoft/fetch-event-source or pass token as query param:
import { fetchEventSource } from "@microsoft/fetch-event-source";

await fetchEventSource("/agui/ws?run_id=run-001", {
  headers: { Authorization: `Bearer ${apiKey}` },
  onmessage(ev) {
    const data = JSON.parse(ev.data);
    if (ev.event === "text_delta") {
      document.getElementById("output").textContent += data.delta;
    }
    if (ev.event === "run_completed") {
      // Stream complete
    }
  },
});

Webhook Triggers

Register webhooks so external services (Stripe, GitHub, Slack) can trigger agent runs:

from sagewai.gateway import WebhookRouter

router = WebhookRouter()

@router.on("stripe.payment_intent.succeeded")
async def handle_payment(payload: dict) -> None:
    # payload is the verified Stripe webhook body
    agent = UniversalAgent(name="billing-agent", model="gpt-4o")
    await agent.chat(f"Process payment: {payload['id']}")

# In your FastAPI app
app.include_router(router.as_fastapi_router(), prefix="/webhooks")

Webhooks are verified via HMAC (Stripe/Slack/GitHub signing secrets). Unverified requests are rejected before reaching the handler.

from sagewai.gateway import WebhookRouter

# Register with HMAC verification
router = WebhookRouter(
    secret="whsec_...",          # your webhook signing secret
    signature_header="stripe-signature",
    verification_mode="stripe",  # or "slack", "github", "hmac-sha256"
)

Pollers

Schedule recurring checks on external services:

from sagewai.gateway import PollerManager

poller = PollerManager()

@poller.every(minutes=15)
async def check_mentions() -> None:
    """Check for new brand mentions and draft responses."""
    mentions = await fetch_mentions()
    if mentions:
        agent = UniversalAgent(name="social-agent", model="gpt-4o")
        for mention in mentions:
            await agent.chat(f"Draft a reply to: {mention['text']}")

await poller.start()

Event Listeners

Subscribe to real-time streams (Kafka, Redis Pub/Sub, SSE from external services):

from sagewai.gateway import ListenerManager

listener = ListenerManager()

@listener.on_sse("https://api.example.com/events")
async def handle_event(event: dict) -> None:
    if event["type"] == "order.created":
        await fulfillment_agent.chat(f"Process order {event['order_id']}")

await listener.start()

Agent-to-Agent Protocol (A2A)

Agents expose agent cards — structured capability declarations that other agents can discover and call:

from sagewai.protocols.a2a import AgentCard, A2AClient

# Declare an agent card
card = AgentCard(
    name="research-agent",
    description="Searches and summarizes information",
    capabilities=["web_search", "summarization"],
    endpoint="https://your-gateway.sagewai.ai/v1/chat/completions",
    model="gpt-4o",
)

# Another agent discovers and calls it
client = A2AClient(gateway_url="https://your-gateway.sagewai.ai")
result = await client.delegate(
    target_agent="research-agent",
    task="Summarize the latest LLM benchmarks",
)

The A2A protocol allows agents from different organizations to collaborate without sharing credentials — only the agent card endpoint is exposed.


Configuring the Gateway

In your admin backend's config:

# apps/admin/admin/config.py
GATEWAY_ENABLED = True
GATEWAY_OPENAI_COMPAT = True   # Enable /v1/chat/completions
GATEWAY_AGUI = True            # Enable /agui/ endpoints
GATEWAY_WEBHOOKS = True        # Enable webhook router
GATEWAY_A2A = True             # Enable A2A agent cards

Or via environment variables:

SAGEWAI_GATEWAY_ENABLED=true
SAGEWAI_GATEWAY_OPENAI_COMPAT=true