Gateway and Streaming

The Sagewai gateway exposes your agents over OpenAI-compatible HTTP, handles incoming webhooks from external services, and streams responses via Server-Sent Events (SSE) using the AG-UI protocol.


OpenAI-Compatible Endpoint

Send chat completion requests to:

POST /v1/chat/completions

Any client that works with the OpenAI API — LangChain, LlamaIndex, custom scripts — can route to Sagewai by changing the base URL.

cURL

curl -X POST https://your-gateway.sagewai.ai/v1/chat/completions \
  -H "Authorization: Bearer $SAGEWAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "my-research-agent",
    "messages": [
      {"role": "user", "content": "Summarize recent AI safety news"}
    ],
    "stream": false
  }'

Python (OpenAI SDK)

from openai import AsyncOpenAI

client = AsyncOpenAI(
    api_key="your-sagewai-api-key",
    base_url="https://your-gateway.sagewai.ai/v1",
)

response = await client.chat.completions.create(
    model="my-research-agent",
    messages=[{"role": "user", "content": "What are the key AI papers from this week?"}],
)
print(response.choices[0].message.content)

Streaming with AG-UI (SSE)

For real-time streaming, Sagewai uses the AG-UI protocol — an SSE-based format that emits structured events as the agent works.

Via the OpenAI-compatible endpoint

async for chunk in await client.chat.completions.create(
    model="my-agent",
    messages=[{"role": "user", "content": "Write a 500-word summary"}],
    stream=True,
):
    delta = chunk.choices[0].delta.content or ""
    print(delta, end="", flush=True)

AG-UI native endpoints

GET /agui/ws
POST /agui/runs

AG-UI events:

Event TypePayloadMeaning
run_started{ run_id, agent_name }Agent started
text_delta{ delta: "..." }Partial text output
tool_call{ name, arguments }Agent calling a tool
tool_result{ name, result }Tool returned a value
run_completed{ run_id, output }Agent finished
run_error{ error }Agent encountered an error

Consuming AG-UI events in JavaScript

The browser EventSource API does not support custom headers. Use @microsoft/fetch-event-source or pass the token as a query parameter:

import { fetchEventSource } from "@microsoft/fetch-event-source";

await fetchEventSource("/agui/ws?run_id=run-001", {
  headers: { Authorization: `Bearer ${apiKey}` },
  onmessage(ev) {
    const data = JSON.parse(ev.data);
    if (ev.event === "text_delta") {
      document.getElementById("output").textContent += data.delta;
    }
    if (ev.event === "run_completed") {
      // Stream complete
    }
  },
});

Webhook Triggers

Register handlers so external services (Stripe, GitHub, Slack) can trigger agent runs by sending HTTP POST events:

from sagewai.gateway import WebhookRouter

router = WebhookRouter()

@router.on("stripe.payment_intent.succeeded")
async def handle_payment(payload: dict) -> None:
    agent = UniversalAgent(name="billing-agent", model="gpt-4o")
    await agent.chat(f"Process payment: {payload['id']}")

app.include_router(router.as_fastapi_router(), prefix="/webhooks")

Webhooks are verified via HMAC (using Stripe, Slack, or GitHub signing secrets). Requests that fail verification are rejected before reaching your handler.

router = WebhookRouter(
    secret="whsec_...",
    signature_header="stripe-signature",
    verification_mode="stripe",  # or "slack", "github", "hmac-sha256"
)

Pollers

Schedule recurring checks on external services:

from sagewai.gateway import PollerManager

poller = PollerManager()

@poller.every(minutes=15)
async def check_mentions() -> None:
    """Check for new brand mentions and draft responses."""
    mentions = await fetch_mentions()
    if mentions:
        agent = UniversalAgent(name="social-agent", model="gpt-4o")
        for mention in mentions:
            await agent.chat(f"Draft a reply to: {mention['text']}")

await poller.start()

Event Listeners

Subscribe to real-time streams from Kafka, Redis Pub/Sub, or external SSE sources:

from sagewai.gateway import ListenerManager

listener = ListenerManager()

@listener.on_sse("https://api.example.com/events")
async def handle_event(event: dict) -> None:
    if event["type"] == "order.created":
        await fulfillment_agent.chat(f"Process order {event['order_id']}")

await listener.start()

Agent-to-Agent Protocol (A2A)

Agents expose agent cards — structured capability declarations that other agents can discover and call. This lets agents from different organizations collaborate without sharing credentials.

from sagewai.protocols.a2a import AgentCard, A2AClient

# Declare an agent card
card = AgentCard(
    name="research-agent",
    description="Searches and summarizes information",
    capabilities=["web_search", "summarization"],
    endpoint="https://your-gateway.sagewai.ai/v1/chat/completions",
    model="gpt-4o",
)

# Another agent calls it
client = A2AClient(gateway_url="https://your-gateway.sagewai.ai")
result = await client.delegate(
    target_agent="research-agent",
    task="Summarize the latest LLM benchmarks",
)

Gateway Configuration

Configure which features are enabled in your admin backend:

# apps/admin/admin/config.py
GATEWAY_ENABLED = True
GATEWAY_OPENAI_COMPAT = True   # /v1/chat/completions
GATEWAY_AGUI = True            # /agui/ endpoints
GATEWAY_WEBHOOKS = True        # webhook router
GATEWAY_A2A = True             # A2A agent cards

Or use environment variables:

SAGEWAI_GATEWAY_ENABLED=true
SAGEWAI_GATEWAY_OPENAI_COMPAT=true