Recipe, Stream events from a turn
Goal: render a live UI from a turn's SSE event stream, spinners, agent names, validator verdicts, and the answer streaming in token by token. The "watch Amy think" feel from the CLI, in any client.
Goal: render a live UI from a turn's SSE event stream, spinners, agent names, validator verdicts, and the answer streaming in token by token. The "watch Amy think" feel from the CLI, in any client.
This recipe goes deeper than Ask a question. It covers the full event catalog, the right code for browsers / Node / RN, the reconnect protocol, graceful cancellation, and a worked CLI-style trace renderer.
STEP 1, Know which event drives which UI element
The SSE stream is the source of truth for everything that happens during a turn. Eight event types, each with a different job in the UI.
| Event | Field you read | Drives in the UI |
|---|---|---|
turn.started | turn_id, at | Open the chat bubble. Start the elapsed-time counter. |
agent.started | agent | Header label: "Data Science is running…". Spinner on. |
agent.thought | agent, delta | The token stream. Append delta to the visible answer. |
agent.completed | agent, output_summary | Header label: "Data Science done". Optionally collapse a sub-trace. |
validator.gate | gate, verdict, evidence | A small inline pill: VALIDATED / REJECTED / CONDITIONAL. |
validator.critic | verdict, reasoning | The "Critic" line in the trace. |
turn.completed | turn_id, result | Close the bubble. Spinner off. Render result.answer as the final text. |
turn.failed | turn_id, error | Red banner with error.message. Stop the spinner. |
A simple rule of thumb: agent.* drives the "who's working" and the
streaming text; validator.* drives the trust indicators; turn.*
drives the lifecycle.
Full payload shapes: SDK: TypeScript / Event types.
STEP 2, Subscribe in your runtime
The SDK works in Node, Bun, browsers, and React Native. Pick the column that matches yours.
| Runtime | What to use | Headers? |
|---|---|---|
| Node / Bun | The SDK's async iterator (amy.turns.stream(id)) | Handled by the SDK |
| Browser | The SDK's async iterator, or raw EventSource (no auth header support, see workaround) | EventSource can't set headers; the SDK uses fetch under the hood for this |
| React Native | react-native-event-source + amy.turns.eventSourceInit(id) | Passed via the constructor options |
Node / Bun
import { Amy } from "@amy/sdk";
const amy = new Amy({
apiKey: process.env.AMY_API_KEY!,
baseUrl: process.env.AMY_BASE_URL!,
});
const turn = await amy.turns.create({
messages: [{ role: "user", content: "How's my recovery?" }],
});
for await (const event of amy.turns.stream(turn.id)) {
switch (event.type) {
case "turn.started": console.log("→ turn started"); break;
case "agent.started": console.log(`→ ${event.agent} running…`); break;
case "agent.thought": process.stdout.write(event.delta); break;
case "agent.completed": console.log(`\n ${event.agent} done`); break;
case "validator.gate": console.log(` gate ${event.gate}: ${event.verdict}`); break;
case "validator.critic": console.log(` critic: ${event.verdict}`); break;
case "turn.completed": console.log("\n✓", event.result.answer); break;
case "turn.failed": console.error("✗", event.error.message); break;
}
}Browser (EventSource)
EventSource doesn't accept custom headers, so you can't pass the
Authorization: Bearer … header directly. The SDK works around this
internally by using fetch with a ReadableStream. If you want raw
EventSource, you can include the API key in a one-shot session
cookie (see Browser notes), but
the SDK is the easier path:
import { Amy } from "@amy/sdk";
const amy = new Amy({
apiKey: localStorage.getItem("amy_api_key")!,
baseUrl: "https://api.amy.health",
});
const turn = await amy.turns.create({
messages: [{ role: "user", content: "How's my recovery?" }],
});
const $bubble = document.querySelector("#answer")!;
for await (const event of amy.turns.stream(turn.id)) {
if (event.type === "agent.thought") {
$bubble.textContent += event.delta;
}
if (event.type === "turn.completed") {
$bubble.textContent = event.result.answer; // canonical final text
}
}React Native
import RNEventSource from "react-native-event-source";
import { Amy } from "@amy/sdk";
const amy = new Amy({
apiKey,
baseUrl,
EventSourceCtor: RNEventSource, // tell the SDK to use the RN polyfill
});
// …or, if you want to manage EventSource yourself:
const { url, headers } = amy.turns.eventSourceInit(turn.id);
const es = new RNEventSource(url, { headers });
es.addEventListener("agent.thought", (evt) => {
const { delta } = JSON.parse(evt.data);
setAnswer((prev) => prev + delta);
});
es.addEventListener("turn.completed", (evt) => {
const { result } = JSON.parse(evt.data);
setAnswer(result.answer);
es.close();
});STEP 3, Handle reconnects with Last-Event-Id
Streams break. Networks idle out, phones go to sleep, server processes
restart. The SSE protocol has a built-in resume mechanism, the server
sends an id: line with every event, and the client passes the last
one it saw back via the Last-Event-Id header on reconnect.
The server replays from that ID forward. Replays are available for 1 hour after turn completion.
With the SDK (transparent)
The SDK reconnects automatically with exponential backoff. To observe it for debugging:
for await (const event of amy.turns.stream(turn.id, {
onReconnect: (lastId) => console.log(`reconnecting from event ${lastId}`),
})) {
// your handler
}Manually
# Initial subscribe
curl -N -H "Authorization: Bearer $AMY_API_KEY" \
"$AMY_BASE_URL/v1/turns/$TURN_ID/events"
# …network blip, last id seen was 42 …
# Resume from event 42
curl -N -H "Authorization: Bearer $AMY_API_KEY" \
-H "Last-Event-Id: 42" \
"$AMY_BASE_URL/v1/turns/$TURN_ID/events"Server replays from event 43 forward.
STEP 4, Cancel a stream gracefully
You'll need this when the user closes the chat, navigates away, or
hits a stop button. The SDK's iterator accepts an AbortSignal:
const controller = new AbortController();
// Wire to your UI's "stop" button
stopButton.onclick = () => controller.abort();
try {
for await (const event of amy.turns.stream(turn.id, { signal: controller.signal })) {
handleEvent(event);
}
} catch (err) {
if (err.name === "AbortError") {
console.log("user cancelled the stream");
} else {
throw err;
}
}Aborting only closes the SSE connection on the client. The turn
keeps running on the server, its result is still saved in D1 and
retrievable later via GET /v1/turns/:id. There is no
POST /v1/turns/:id/cancel endpoint in v1. If you want to ignore the
result, just don't read it.
Worked example, CLI-style trace renderer
Below is a 60-line script that produces output close to the trace you see in the README's real-turn snippets, agent names, validator gates, costs, and the final answer streamed in.
// trace.ts
import { Amy } from "@amy/sdk";
const amy = new Amy({
apiKey: process.env.AMY_API_KEY!,
baseUrl: process.env.AMY_BASE_URL!,
});
const turn = await amy.turns.create({
messages: [{ role: "user", content: process.argv[2] ?? "How's my recovery?" }],
});
const start = Date.now();
const ts = () => {
const ms = Date.now() - start;
const s = String(Math.floor(ms / 1000)).padStart(3, " ");
return `[+${s}s]`;
};
const ANSI = { dim: "\x1b[2m", reset: "\x1b[0m", green: "\x1b[32m", red: "\x1b[31m", yellow: "\x1b[33m" };
const verdictColor = (v: string) =>
v === "VALIDATED" ? ANSI.green : v === "REJECTED" ? ANSI.red : ANSI.yellow;
let currentAgent: string | null = null;
let answer = "";
for await (const event of amy.turns.stream(turn.id)) {
switch (event.type) {
case "turn.started":
console.log(`${ANSI.dim}${ts()} orchestrator → turn started${ANSI.reset}`);
break;
case "agent.started":
currentAgent = event.agent;
console.log(`${ANSI.dim}${ts()} ${event.agent.padEnd(18)} → running${ANSI.reset}`);
break;
case "agent.thought":
// For the final synthesis agent, mirror tokens to stdout.
if (event.agent === "synthesis") {
process.stdout.write(event.delta);
answer += event.delta;
}
break;
case "agent.completed":
console.log(`${ANSI.dim}${ts()} ${event.agent.padEnd(18)} ✓ done${ANSI.reset}`);
break;
case "validator.gate":
console.log(
`${ANSI.dim}${ts()} validator ${verdictColor(event.verdict)}■ ${event.verdict.padEnd(11)}${ANSI.reset}${ANSI.dim} ${event.gate}${ANSI.reset}`,
);
break;
case "validator.critic":
console.log(`${ANSI.dim}${ts()} validator → critic: ${event.verdict}${ANSI.reset}`);
break;
case "turn.completed":
console.log(`\n${ANSI.dim}${ts()} orchestrator ✓ turn complete ($${event.result.cost_usd.toFixed(4)})${ANSI.reset}`);
// The synthesis stream is the canonical answer, but the final
// result.answer is authoritative if you missed any tokens.
if (!answer) console.log("\namy ", event.result.answer);
break;
case "turn.failed":
console.error(`\n${ANSI.red}✗ turn failed: ${event.error.message}${ANSI.reset}`);
process.exit(1);
}
}Run it:
bun trace.ts "is my -7.3% sleep score drop clinically meaningful?"You'll get output like:
[+ 0s] orchestrator → turn started
[+ 1s] data_science → running
[+ 64s] data_science ✓ done
[+ 64s] validator ■ VALIDATED ds-avg-rhr-all-data
[+ 91s] validator → critic: accept
[+138s] domain_expert → running
[+201s] domain_expert ✓ done
[+201s] synthesis → running
Short answer: no — by the most defensible read of your own data…
[+222s] synthesis ✓ done
[+222s] orchestrator ✓ turn complete ($0.1288)Common mistakes
Rendering agent.thought for every agent
Most agents do their reasoning silently, only the synthesis agent
streams the user-facing answer. If you render every agent.thought,
the user sees a wall of internal monologue from investigator and data
scientist. Filter on event.agent === "synthesis" for the visible
answer, and use the other agents' thoughts only for an opt-in
"verbose" mode.
Treating the stream's last agent.thought chunks as the final answer
The agent.thought stream is a best-effort token feed. The
authoritative answer is turn.completed.result.answer, that's
what was committed to D1 and what GET /v1/turns/:id returns. If you
display the streamed tokens, swap them out for result.answer on
turn.completed to handle any dropped chunks.
Forgetting that EventSource can't set Authorization
The browser's built-in EventSource API has no way to set request
headers. If you try to use it directly with Authorization: Bearer,
it silently sends no header and the server returns 401 (which
EventSource reports as a generic onerror). Either use the SDK
(which uses fetch + ReadableStream internally) or pass the API
key via a one-shot signed URL.
Reconnecting from event id 0 after a disconnect
If you reconnect without Last-Event-Id, the server replays the
entire stream from the start, and you'll re-render every event,
re-stream every token, and confuse your UI. Track the last id you
saw on every event and pass it on reconnect. (The SDK does this for
you.)
Holding the connection open after turn.completed
The server closes the stream after turn.completed or turn.failed.
If you keep your client's iterator alive, you'll eventually hit
"stream closed" / EOF on the next read. Break out of the loop on
turn.completed/turn.failed:
for await (const event of amy.turns.stream(turn.id)) {
handleEvent(event);
if (event.type === "turn.completed" || event.type === "turn.failed") break;
}(The SDK's iterator does this automatically, break is only needed
if you're parsing the raw SSE yourself.)
Where to next
- Just want the answer with no UI? Recipe: Ask a question.
- Building a chat UI from scratch? Recipe: Build a mobile app, uses this stream for the chat screen.
- Full event schema: SDK: TypeScript / Event types.
- How the events get produced in the first place: Architecture: Streaming.
Recipe, Ask Amy a question
Goal: the smallest possible turn against a live Amy backend. One question in, one answer out. Three variants depending on whether you want to block, stream, or fire-and-forget.
Recipe, Connect a wearable
Goal: let a user link their Whoop / Oura / Garmin / Fitbit / Apple Watch (or any of the 30+ providers Terra supports) to Amy. End state: a new row in GET /v1/sources and data flowing in via the Terra…