Streaming
Amy streams every step of a turn, routing, agent thoughts, validator verdicts, the synthesised answer, over Server-Sent Events. One HTTP connection, one durable replay buffer, no websockets require…
Amy streams every step of a turn, routing, agent thoughts, validator verdicts, the synthesised answer, over Server-Sent Events. One HTTP connection, one durable replay buffer, no websockets required.
The CLI's "watch Amy think" UX exists because every client subscribes to the same SSE channel. This page is the protocol-level reference: the wire format, every event type, reconnects, and how to consume from curl, the browser, React Native, and Swift.
Quick navigation
- When to use streaming
- Connection setup
- The wire format
- Event catalog
- Reconnects and replay
- Backpressure
- Sample raw transcript
- Consuming the stream
- Common mistakes
When to use streaming
Subscribe to /events whenever a human is waiting on the answer. Amy
turns run for 30s-7min depending on routing; a black-box wait erodes
trust. Streaming lets you render:
- Routing, so the user knows which specialists are working.
- Agent thoughts, so the answer feels alive as it's being written.
- Validator verdicts, so the user sees that numbers were checked (this is load-bearing for trust, see the README's Validator section).
- The final synthesis, token by token.
When not to stream: batch jobs, retries, cron-driven workflows.
Pass "stream": false to POST /v1/turns and the request blocks
until the turn completes. See Turns: Blocking vs streaming.
Connection setup
Streaming is one endpoint:
GET /v1/turns/:id/eventsRequired headers
GET /v1/turns/turn_01HX2K3M4N5P6Q7R8S9T0V1W2X/events HTTP/1.1
Host: api.amy.health
Accept: text/event-stream
Authorization: Bearer amy_live_…| Header | Required? | Notes |
|---|---|---|
Accept | yes | Must include text/event-stream. Sending application/json returns the snapshot Turn object instead. |
Authorization | yes | Same bearer token as the rest of the API. |
Last-Event-Id | optional | On reconnect, set this to the last id: you observed. See Reconnects and replay. |
Response headers
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Request-Id: req_01HX2K3M4N5P6Q7R8S9T0V1W2XX-Request-Id is per-connection (not per-event). Include it when
reporting stuck streams.
What you can subscribe to
You can open the stream before, during, or after the turn, even
the SSE for an already-completed turn will replay from the buffer
(within the 1-hour window; see Reconnects and replay).
Opening twice in parallel is supported but pointless: each connection gets the same events.
The wire format
Standard SSE per the WHATWG spec. Each event is three lines:
id: <monotonic-integer>
event: <event-type>
data: <single-line JSON>
(blank line ends the event)
Constraints Amy enforces:
idis a strictly-increasing integer per turn, starting at1. No gaps. Use it forLast-Event-Idon reconnect.eventis one of the types in the Event catalog.datais always one line of JSON. We never split a JSON object across multipledata:lines, so consumers canJSON.parse(line)directly.- No comments (
:lines). Workers don't need a heartbeat because the underlying TCP connection stays warm and Cloudflare doesn't time the SSE out for 100 seconds; we use that quietly.
There is no chunked retry directive (retry:). Clients use their own
backoff, the SDK uses exponential with jitter, max 5 attempts.
Event catalog
Every event is one of these types. Listed in roughly the order they appear in a typical turn.
turn.started
Fires once, when the workflow's first step begins.
{
"turn_id": "turn_01HX2K3M4N5P6Q7R8S9T0V1W2X",
"at": "2026-05-25T10:00:00.123Z"
}| Field | Type | Notes |
|---|---|---|
turn_id | string | The turn this event belongs to. Same as the URL path. |
at | ISO-8601 | When step 1 actually began (not when the POST landed). |
agent.started
Fires when each sub-agent begins. Multiple agents run in parallel,
expect two or three agent.started events with no agent.completed
between them.
{
"agent": "data_science",
"at": "2026-05-25T10:00:04.500Z",
"question": "Compute average resting heart rate over all available daily data"
}| Field | Type | Notes |
|---|---|---|
agent | enum | data_science · domain_expert · health_coach · investigator. |
at | ISO-8601 | |
question | string | The rephrased sub-question for this agent (not the user's original). |
agent.thought
Token-level streaming from the current agent. Fires many times, one per chunk emitted by the model. The CLI renders these as live text.
{
"agent": "data_science",
"delta": "Looking at "
}| Field | Type | Notes |
|---|---|---|
agent | enum | Same values as agent.started. |
delta | string | A token chunk. Concatenate to reconstruct the agent's narration. Whitespace is significant. |
delta is append-only, you never get a replace or insert. To
get the full text per agent, accumulate deltas between an
agent.started and its matching agent.completed.
agent.completed
Fires when each sub-agent finishes. May arrive long after
agent.thoughts stop (the agent finalises its structured output
post-stream).
{
"agent": "data_science",
"at": "2026-05-25T10:01:42.900Z",
"duration_ms": 98400,
"cost_usd": 0.0895,
"output_summary": "Average RHR 60.39 bpm over 160 valid daily readings (2025-04 → 2026-05). Median 59, SD ±4.47."
}| Field | Type | Notes |
|---|---|---|
agent | enum | |
at | ISO-8601 | |
duration_ms | number | Wall time for this agent. |
cost_usd | number | LLM spend for this agent only. |
output_summary | string | First ~160 chars of the agent's output for log display. Not load-bearing. |
validator.gate
Fires once per gate per finding. Up to 7 gates run per finding (sample size, effect-vs-noise, construct validity, bootstrap, subgroup consistency, method triangulation, discriminative power).
{
"finding_id": "ds-001",
"claim": "average resting heart rate over all available daily data",
"gate": "bootstrap",
"verdict": "passed",
"detail": {
"ci_low": -0.42,
"ci_high": -0.31,
"boot_median": -0.374,
"iterations": 1000
}
}| Field | Type | Notes |
|---|---|---|
finding_id | string | Stable across the turn. The Fact Sheet keys this. |
claim | string | One-sentence summary of what's being checked. |
gate | enum | sample_size · effect_vs_noise · construct_validity · bootstrap · subgroup_consistency · method_triangulation · discriminative_power. |
verdict | enum | passed · failed · skipped (not applicable to this finding's kind). |
detail | object | Gate-specific numerics. Shape varies by gate. Useful for trace UIs and debugging. |
construct_validity and sample_size are hard gates. Failing
either short-circuits the rest, you'll see fewer than 7 validator.gate
events for that finding.
validator.critic
Fires once per finding, after gates. The Critic is an LLM running an adversarial review with literature priors loaded into context.
{
"finding_id": "ds-001",
"verdict": "accept",
"reasoning": "Descriptive summary across 160 daily readings, not a causal claim. No confounders to flag.",
"concerns": []
}| Field | Type | Notes |
|---|---|---|
finding_id | string | |
verdict | enum | accept · downgrade · reject. downgrade flips a validated gate verdict to conditional; reject overrides everything. |
reasoning | string | One paragraph. Safe to surface to power users. |
concerns | array | Structured concerns (confounder, reverse_causation, selection_bias, literature_contradiction, tautology, small_n, noise) with severity. |
turn.completed
Fires once, last, on success. After this, no more events arrive on the stream, the server closes the connection.
{
"turn_id": "turn_01HX2K3M4N5P6Q7R8S9T0V1W2X",
"result": {
"answer": "Short answer: no — by the most defensible read of your own data...",
"fact_sheet": [
{ "claim": "ds-001.effect", "value": -0.374, "unit": null,
"source": "data_science", "n": 87, "window": "last 90 days" }
],
"agents_used": ["data_science", "domain_expert"],
"validator": { "findings_total": 3, "findings_validated": 2, "findings_conditional": 0, "findings_rejected": 1 },
"cost_usd": 0.1288,
"duration_ms": 222000
}
}The result payload is identical to Turn.result from
GET /v1/turns/:id. You don't need a second fetch after the stream
closes.
turn.failed
Fires once, last, on failure. Mutually exclusive with
turn.completed.
{
"turn_id": "turn_01HX2K3M4N5P6Q7R8S9T0V1W2X",
"error": {
"code": "upstream_unavailable",
"message": "Anthropic API returned 529 after 3 retries.",
"request_id": "req_01HX2K3M4N5P6Q7R8S9T0V1W2X",
"docs_url": "https://docs.amy.health/concepts/errors#upstream_unavailable"
}
}Treat turn.failed the same way you'd treat a non-2xx response. See
Errors for the recovery matrix.
Reconnects and replay
SSE connections drop, phones lose signal, browser tabs sleep, intermediaries time out. Amy's stream is designed to survive this without losing events.
How it works
Every event is buffered to KV under stream:<turn_id>:<seq> before
it's sent. The buffer is retained for 1 hour after completed_at.
On reconnect, set Last-Event-Id: <last_seq_you_saw>:
GET /v1/turns/turn_01HX.../events HTTP/1.1
Accept: text/event-stream
Authorization: Bearer …
Last-Event-Id: 43The server replays from seq=44 forward. If the turn is still running,
new events follow. If the turn is already complete, you'll get the
remaining buffered events and then a graceful close.
What the SDK does
The TypeScript SDK handles this for you:
const iterator = amy.turns.stream(turn.id, {
onReconnect: (lastId) => console.log("Reconnecting from", lastId),
});
for await (const event of iterator) {
// The iterator transparently reconnects on transient errors,
// tracks Last-Event-Id, and resumes the for-await loop.
}Backoff is exponential with jitter; max 5 attempts before throwing.
You can plug your own with maxRetries and retryDelayMs.
When replay is no longer available
After 1 hour past completed_at, the KV buffer is garbage-collected.
Reconnecting then returns 404 turn_events_expired. The Turn row
itself is permanent, GET /v1/turns/:id still works forever.
If you need long-term event history, persist events client-side as
they arrive. The CLI does this for the /trace command.
Backpressure
SSE has no flow control. If your client can't keep up with the event rate, events queue in the HTTP buffer and your reader falls behind
- eventually the connection's TCP send buffer fills and the server
blocks on
write().
This rarely matters in practice, Amy emits ~50-300 events per turn, spaced over minutes. But two cases can bite:
- You're rendering each
agent.thoughtinto a DOM mutation synchronously. Browsers can choke at 100+ DOM ops/sec. Buffer tokens for a 16ms frame and flush inrequestAnimationFrame. - You're piping events into a slow downstream sink (a database write per event). Batch the writes, the SDK iterator has a built-in queue, but a downstream blocking call will still back-pressure through it.
For server-side consumers, run the event-loop on a separate async task from the sink-write loop. The SDK iterator is non-blocking between events.
Sample raw transcript
A real turn, abridged. Newlines preserved exactly as they appear on the wire.
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
X-Request-Id: req_01HX2K3M4N5P6Q7R8S9T0V1W2X
id: 1
event: turn.started
data: {"turn_id":"turn_01HX2K3M4N5P6Q7R8S9T0V1W2X","at":"2026-05-25T10:00:00.123Z"}
id: 2
event: agent.started
data: {"agent":"data_science","at":"2026-05-25T10:00:04.500Z","question":"Compute average resting heart rate over all available daily data"}
id: 3
event: agent.thought
data: {"agent":"data_science","delta":"Looking at "}
id: 4
event: agent.thought
data: {"agent":"data_science","delta":"your daily summary table, "}
id: 5
event: agent.thought
data: {"agent":"data_science","delta":"I have 160 valid readings..."}
id: 47
event: agent.completed
data: {"agent":"data_science","at":"2026-05-25T10:01:42.900Z","duration_ms":98400,"cost_usd":0.0895,"output_summary":"Average RHR 60.39 bpm..."}
id: 48
event: validator.gate
data: {"finding_id":"ds-001","claim":"average resting heart rate","gate":"sample_size","verdict":"passed","detail":{"n":160,"min_required":10}}
id: 49
event: validator.gate
data: {"finding_id":"ds-001","claim":"average resting heart rate","gate":"bootstrap","verdict":"passed","detail":{"mean_ci_low":59.7,"mean_ci_high":61.1,"boot_median":60.4}}
id: 55
event: validator.critic
data: {"finding_id":"ds-001","verdict":"accept","reasoning":"Descriptive summary across 160 daily readings, not a causal claim.","concerns":[]}
id: 56
event: agent.thought
data: {"agent":"data_science","delta":"Your average "}
id: 87
event: turn.completed
data: {"turn_id":"turn_01HX2K3M4N5P6Q7R8S9T0V1W2X","result":{"answer":"Your average resting heart rate is 60.39 bpm...","fact_sheet":[{"claim":"ds-001.avg","value":60.39,"unit":"bpm","source":"data_science","n":160,"window":"all"}],"agents_used":["data_science"],"validator":{"findings_total":1,"findings_validated":1,"findings_conditional":0,"findings_rejected":0},"cost_usd":0.1288,"duration_ms":222000}}Note: between agent.completed (id 47) and the validator gates (48,
49, 55), there are no thoughts. Synthesis tokens (id 56+) come from
the final synthesis step, not the data scientist, but the wire
format groups them under the same logical agent channel for
rendering simplicity. The agent.started/agent.completed pair
that brackets synthesis uses agent: "synthesis".
Consuming the stream
curl
curl -N \
-H "Authorization: Bearer $AMY_API_KEY" \
-H "Accept: text/event-stream" \
https://api.amy.health/v1/turns/turn_01HX.../events-N disables buffering. Without it curl will hold output until the
connection closes.
To resume from a known event ID:
curl -N \
-H "Authorization: Bearer $AMY_API_KEY" \
-H "Accept: text/event-stream" \
-H "Last-Event-Id: 43" \
https://api.amy.health/v1/turns/turn_01HX.../eventsBrowser (EventSource)
Native EventSource is the simplest path. It handles Last-Event-Id
automatically.
const url = `https://api.amy.health/v1/turns/${turnId}/events`;
// EventSource cannot set custom headers — pass the API key as a query
// string fallback, or proxy through your own backend that adds the
// Authorization header server-side.
const es = new EventSource(`${url}?api_key=${apiKey}`);
es.addEventListener("turn.started", (e) => {
console.log("Started:", JSON.parse(e.data));
});
es.addEventListener("agent.thought", (e) => {
const { agent, delta } = JSON.parse(e.data);
document.getElementById(agent).textContent += delta;
});
es.addEventListener("turn.completed", (e) => {
const { result } = JSON.parse(e.data);
render(result.answer);
es.close();
});
es.addEventListener("turn.failed", (e) => {
const { error } = JSON.parse(e.data);
console.error(error.code, error.message);
es.close();
});Important: browser EventSource cannot set the Authorization
header. Two options:
- Proxy through your backend; have it add the bearer token.
- Cookie auth, out of scope for v1; we don't issue cookies.
For production browser use, proxy. The SDK does this transparently in
React/Vue if you set baseUrl to your own proxy.
React Native
EventSource is not in React Native's standard polyfill. Use
react-native-event-source and pass it to the SDK:
import EventSource from "react-native-event-source";
import { Amy } from "@amy/sdk";
const amy = new Amy({
apiKey,
EventSourceCtor: EventSource,
});
for await (const event of amy.turns.stream(turnId)) {
// works identically to Node / browser
}react-native-event-source does support custom headers, so you don't
need the proxy trick. Pass the Authorization header directly through
the SDK.
Swift (URLSession)
There's no built-in EventSource in Foundation, but the bytes-stream
API is enough:
let url = URL(string: "https://api.amy.health/v1/turns/\(turnId)/events")!
var req = URLRequest(url: url)
req.setValue("text/event-stream", forHTTPHeaderField: "Accept")
req.setValue("Bearer \(apiKey)", forHTTPHeaderField: "Authorization")
req.setValue(String(lastEventId), forHTTPHeaderField: "Last-Event-Id")
let (bytes, _) = try await URLSession.shared.bytes(for: req)
var currentEvent: (id: String?, event: String?, data: String?) = (nil, nil, nil)
for try await line in bytes.lines {
if line.isEmpty {
// dispatch event
if let type = currentEvent.event, let data = currentEvent.data {
handle(type: type, data: Data(data.utf8))
}
currentEvent = (nil, nil, nil)
continue
}
if line.hasPrefix("id: ") { currentEvent.id = String(line.dropFirst(4)) }
if line.hasPrefix("event: ") { currentEvent.event = String(line.dropFirst(7)) }
if line.hasPrefix("data: ") { currentEvent.data = String(line.dropFirst(6)) }
}The forthcoming Swift SDK wraps this with typed handlers; until then, the snippet above is what it does under the hood.
Common mistakes
Connecting without Accept: text/event-stream
You'll get the JSON snapshot of the Turn instead of the stream. The
endpoint content-negotiates: SSE if Accept matches, JSON otherwise.
Treating agent.thought delta as a full message
It's a chunk. Concatenate deltas for the same agent between
agent.started and agent.completed to get the full narration.
Whitespace is significant, don't trim.
Reconnecting without Last-Event-Id
You'll receive duplicate events from the start of the buffer. Always
track the last id: you observed and pass it on reconnect.
Reconnecting more than 1 hour after completed_at
The replay buffer is GC'd. You'll get 404 turn_events_expired. Read
the terminal state with GET /v1/turns/:id instead, it's permanent.
Splitting JSON across multiple data: lines
We don't, and your parser shouldn't expect it. Each data: line is a
complete JSON document. If you see a parse error, it's a bug, report
with the request ID.
Letting EventSource reconnect on its own
The browser EventSource reconnects automatically but uses a
3-second linear delay with no jitter and no cap. For a long-running
turn over a flaky network, this is a bad strategy. Use the SDK
(amy.turns.stream) which has exponential backoff with jitter.
Forgetting to close the stream after turn.completed
The server will close it for you, but your client should detect the
terminal event (turn.completed or turn.failed) and stop processing.
The SDK iterator does this; raw EventSource consumers must do it
manually.
Opening one stream per UI subscriber
Open one stream per turn, then fan out events client-side. Two concurrent streams to the same turn waste resources and can rate-limit your account.
Polling GET /v1/turns/:id and streaming
Pick one. They return the same state. Polling while streaming doubles your request count and can race against the stream's terminal event.
Where to next
- Turns, what triggers the events you're streaming.
- Errors, what each
turn.failedcode means. - API reference: Streaming, the endpoint signature.
- SDK: TypeScript, streaming, typed iterator, reconnection, cancellation.
- Internals: Runtime, how the KV-backed event buffer works underneath.
Turns
A turn is one round-trip of the agent: the user asks, Amy answers, and every quantitative claim in the answer has been checked. It's the unit of work the backend dispatches, persists, streams, and bi…
Memory
What Amy remembers between turns, your goals, your preferences, the insights that earned validation. Memory is read into every turn's context, written to at the end of every turn, and fully visible…