Amy
Concepts

Streaming

Amy streams every step of a turn, routing, agent thoughts, validator verdicts, the synthesised answer, over Server-Sent Events. One HTTP connection, one durable replay buffer, no websockets require…

Amy streams every step of a turn, routing, agent thoughts, validator verdicts, the synthesised answer, over Server-Sent Events. One HTTP connection, one durable replay buffer, no websockets required.

The CLI's "watch Amy think" UX exists because every client subscribes to the same SSE channel. This page is the protocol-level reference: the wire format, every event type, reconnects, and how to consume from curl, the browser, React Native, and Swift.

Quick navigation


When to use streaming

Subscribe to /events whenever a human is waiting on the answer. Amy turns run for 30s-7min depending on routing; a black-box wait erodes trust. Streaming lets you render:

  • Routing, so the user knows which specialists are working.
  • Agent thoughts, so the answer feels alive as it's being written.
  • Validator verdicts, so the user sees that numbers were checked (this is load-bearing for trust, see the README's Validator section).
  • The final synthesis, token by token.

When not to stream: batch jobs, retries, cron-driven workflows. Pass "stream": false to POST /v1/turns and the request blocks until the turn completes. See Turns: Blocking vs streaming.


Connection setup

Streaming is one endpoint:

GET /v1/turns/:id/events

Required headers

GET /v1/turns/turn_01HX2K3M4N5P6Q7R8S9T0V1W2X/events HTTP/1.1
Host: api.amy.health
Accept: text/event-stream
Authorization: Bearer amy_live_…
HeaderRequired?Notes
AcceptyesMust include text/event-stream. Sending application/json returns the snapshot Turn object instead.
AuthorizationyesSame bearer token as the rest of the API.
Last-Event-IdoptionalOn reconnect, set this to the last id: you observed. See Reconnects and replay.

Response headers

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Request-Id: req_01HX2K3M4N5P6Q7R8S9T0V1W2X

X-Request-Id is per-connection (not per-event). Include it when reporting stuck streams.

What you can subscribe to

You can open the stream before, during, or after the turn, even the SSE for an already-completed turn will replay from the buffer (within the 1-hour window; see Reconnects and replay).

Opening twice in parallel is supported but pointless: each connection gets the same events.


The wire format

Standard SSE per the WHATWG spec. Each event is three lines:

id: <monotonic-integer>
event: <event-type>
data: <single-line JSON>

(blank line ends the event)

Constraints Amy enforces:

  • id is a strictly-increasing integer per turn, starting at 1. No gaps. Use it for Last-Event-Id on reconnect.
  • event is one of the types in the Event catalog.
  • data is always one line of JSON. We never split a JSON object across multiple data: lines, so consumers can JSON.parse(line) directly.
  • No comments (: lines). Workers don't need a heartbeat because the underlying TCP connection stays warm and Cloudflare doesn't time the SSE out for 100 seconds; we use that quietly.

There is no chunked retry directive (retry:). Clients use their own backoff, the SDK uses exponential with jitter, max 5 attempts.


Event catalog

Every event is one of these types. Listed in roughly the order they appear in a typical turn.

turn.started

Fires once, when the workflow's first step begins.

{
  "turn_id": "turn_01HX2K3M4N5P6Q7R8S9T0V1W2X",
  "at": "2026-05-25T10:00:00.123Z"
}
FieldTypeNotes
turn_idstringThe turn this event belongs to. Same as the URL path.
atISO-8601When step 1 actually began (not when the POST landed).

agent.started

Fires when each sub-agent begins. Multiple agents run in parallel, expect two or three agent.started events with no agent.completed between them.

{
  "agent": "data_science",
  "at": "2026-05-25T10:00:04.500Z",
  "question": "Compute average resting heart rate over all available daily data"
}
FieldTypeNotes
agentenumdata_science · domain_expert · health_coach · investigator.
atISO-8601
questionstringThe rephrased sub-question for this agent (not the user's original).

agent.thought

Token-level streaming from the current agent. Fires many times, one per chunk emitted by the model. The CLI renders these as live text.

{
  "agent": "data_science",
  "delta": "Looking at "
}
FieldTypeNotes
agentenumSame values as agent.started.
deltastringA token chunk. Concatenate to reconstruct the agent's narration. Whitespace is significant.

delta is append-only, you never get a replace or insert. To get the full text per agent, accumulate deltas between an agent.started and its matching agent.completed.

agent.completed

Fires when each sub-agent finishes. May arrive long after agent.thoughts stop (the agent finalises its structured output post-stream).

{
  "agent": "data_science",
  "at": "2026-05-25T10:01:42.900Z",
  "duration_ms": 98400,
  "cost_usd": 0.0895,
  "output_summary": "Average RHR 60.39 bpm over 160 valid daily readings (2025-04 → 2026-05). Median 59, SD ±4.47."
}
FieldTypeNotes
agentenum
atISO-8601
duration_msnumberWall time for this agent.
cost_usdnumberLLM spend for this agent only.
output_summarystringFirst ~160 chars of the agent's output for log display. Not load-bearing.

validator.gate

Fires once per gate per finding. Up to 7 gates run per finding (sample size, effect-vs-noise, construct validity, bootstrap, subgroup consistency, method triangulation, discriminative power).

{
  "finding_id": "ds-001",
  "claim": "average resting heart rate over all available daily data",
  "gate": "bootstrap",
  "verdict": "passed",
  "detail": {
    "ci_low": -0.42,
    "ci_high": -0.31,
    "boot_median": -0.374,
    "iterations": 1000
  }
}
FieldTypeNotes
finding_idstringStable across the turn. The Fact Sheet keys this.
claimstringOne-sentence summary of what's being checked.
gateenumsample_size · effect_vs_noise · construct_validity · bootstrap · subgroup_consistency · method_triangulation · discriminative_power.
verdictenumpassed · failed · skipped (not applicable to this finding's kind).
detailobjectGate-specific numerics. Shape varies by gate. Useful for trace UIs and debugging.

construct_validity and sample_size are hard gates. Failing either short-circuits the rest, you'll see fewer than 7 validator.gate events for that finding.

validator.critic

Fires once per finding, after gates. The Critic is an LLM running an adversarial review with literature priors loaded into context.

{
  "finding_id": "ds-001",
  "verdict": "accept",
  "reasoning": "Descriptive summary across 160 daily readings, not a causal claim. No confounders to flag.",
  "concerns": []
}
FieldTypeNotes
finding_idstring
verdictenumaccept · downgrade · reject. downgrade flips a validated gate verdict to conditional; reject overrides everything.
reasoningstringOne paragraph. Safe to surface to power users.
concernsarrayStructured concerns (confounder, reverse_causation, selection_bias, literature_contradiction, tautology, small_n, noise) with severity.

turn.completed

Fires once, last, on success. After this, no more events arrive on the stream, the server closes the connection.

{
  "turn_id": "turn_01HX2K3M4N5P6Q7R8S9T0V1W2X",
  "result": {
    "answer": "Short answer: no — by the most defensible read of your own data...",
    "fact_sheet": [
      { "claim": "ds-001.effect", "value": -0.374, "unit": null,
        "source": "data_science", "n": 87, "window": "last 90 days" }
    ],
    "agents_used": ["data_science", "domain_expert"],
    "validator": { "findings_total": 3, "findings_validated": 2, "findings_conditional": 0, "findings_rejected": 1 },
    "cost_usd": 0.1288,
    "duration_ms": 222000
  }
}

The result payload is identical to Turn.result from GET /v1/turns/:id. You don't need a second fetch after the stream closes.

turn.failed

Fires once, last, on failure. Mutually exclusive with turn.completed.

{
  "turn_id": "turn_01HX2K3M4N5P6Q7R8S9T0V1W2X",
  "error": {
    "code": "upstream_unavailable",
    "message": "Anthropic API returned 529 after 3 retries.",
    "request_id": "req_01HX2K3M4N5P6Q7R8S9T0V1W2X",
    "docs_url": "https://docs.amy.health/concepts/errors#upstream_unavailable"
  }
}

Treat turn.failed the same way you'd treat a non-2xx response. See Errors for the recovery matrix.


Reconnects and replay

SSE connections drop, phones lose signal, browser tabs sleep, intermediaries time out. Amy's stream is designed to survive this without losing events.

How it works

Every event is buffered to KV under stream:<turn_id>:<seq> before it's sent. The buffer is retained for 1 hour after completed_at.

On reconnect, set Last-Event-Id: <last_seq_you_saw>:

GET /v1/turns/turn_01HX.../events HTTP/1.1
Accept: text/event-stream
Authorization: Bearer …
Last-Event-Id: 43

The server replays from seq=44 forward. If the turn is still running, new events follow. If the turn is already complete, you'll get the remaining buffered events and then a graceful close.

What the SDK does

The TypeScript SDK handles this for you:

const iterator = amy.turns.stream(turn.id, {
  onReconnect: (lastId) => console.log("Reconnecting from", lastId),
});

for await (const event of iterator) {
  // The iterator transparently reconnects on transient errors,
  // tracks Last-Event-Id, and resumes the for-await loop.
}

Backoff is exponential with jitter; max 5 attempts before throwing. You can plug your own with maxRetries and retryDelayMs.

When replay is no longer available

After 1 hour past completed_at, the KV buffer is garbage-collected. Reconnecting then returns 404 turn_events_expired. The Turn row itself is permanent, GET /v1/turns/:id still works forever.

If you need long-term event history, persist events client-side as they arrive. The CLI does this for the /trace command.


Backpressure

SSE has no flow control. If your client can't keep up with the event rate, events queue in the HTTP buffer and your reader falls behind

  • eventually the connection's TCP send buffer fills and the server blocks on write().

This rarely matters in practice, Amy emits ~50-300 events per turn, spaced over minutes. But two cases can bite:

  1. You're rendering each agent.thought into a DOM mutation synchronously. Browsers can choke at 100+ DOM ops/sec. Buffer tokens for a 16ms frame and flush in requestAnimationFrame.
  2. You're piping events into a slow downstream sink (a database write per event). Batch the writes, the SDK iterator has a built-in queue, but a downstream blocking call will still back-pressure through it.

For server-side consumers, run the event-loop on a separate async task from the sink-write loop. The SDK iterator is non-blocking between events.


Sample raw transcript

A real turn, abridged. Newlines preserved exactly as they appear on the wire.

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
X-Request-Id: req_01HX2K3M4N5P6Q7R8S9T0V1W2X

id: 1
event: turn.started
data: {"turn_id":"turn_01HX2K3M4N5P6Q7R8S9T0V1W2X","at":"2026-05-25T10:00:00.123Z"}

id: 2
event: agent.started
data: {"agent":"data_science","at":"2026-05-25T10:00:04.500Z","question":"Compute average resting heart rate over all available daily data"}

id: 3
event: agent.thought
data: {"agent":"data_science","delta":"Looking at "}

id: 4
event: agent.thought
data: {"agent":"data_science","delta":"your daily summary table, "}

id: 5
event: agent.thought
data: {"agent":"data_science","delta":"I have 160 valid readings..."}

id: 47
event: agent.completed
data: {"agent":"data_science","at":"2026-05-25T10:01:42.900Z","duration_ms":98400,"cost_usd":0.0895,"output_summary":"Average RHR 60.39 bpm..."}

id: 48
event: validator.gate
data: {"finding_id":"ds-001","claim":"average resting heart rate","gate":"sample_size","verdict":"passed","detail":{"n":160,"min_required":10}}

id: 49
event: validator.gate
data: {"finding_id":"ds-001","claim":"average resting heart rate","gate":"bootstrap","verdict":"passed","detail":{"mean_ci_low":59.7,"mean_ci_high":61.1,"boot_median":60.4}}

id: 55
event: validator.critic
data: {"finding_id":"ds-001","verdict":"accept","reasoning":"Descriptive summary across 160 daily readings, not a causal claim.","concerns":[]}

id: 56
event: agent.thought
data: {"agent":"data_science","delta":"Your average "}

id: 87
event: turn.completed
data: {"turn_id":"turn_01HX2K3M4N5P6Q7R8S9T0V1W2X","result":{"answer":"Your average resting heart rate is 60.39 bpm...","fact_sheet":[{"claim":"ds-001.avg","value":60.39,"unit":"bpm","source":"data_science","n":160,"window":"all"}],"agents_used":["data_science"],"validator":{"findings_total":1,"findings_validated":1,"findings_conditional":0,"findings_rejected":0},"cost_usd":0.1288,"duration_ms":222000}}

Note: between agent.completed (id 47) and the validator gates (48, 49, 55), there are no thoughts. Synthesis tokens (id 56+) come from the final synthesis step, not the data scientist, but the wire format groups them under the same logical agent channel for rendering simplicity. The agent.started/agent.completed pair that brackets synthesis uses agent: "synthesis".


Consuming the stream

curl

curl -N \
  -H "Authorization: Bearer $AMY_API_KEY" \
  -H "Accept: text/event-stream" \
  https://api.amy.health/v1/turns/turn_01HX.../events

-N disables buffering. Without it curl will hold output until the connection closes.

To resume from a known event ID:

curl -N \
  -H "Authorization: Bearer $AMY_API_KEY" \
  -H "Accept: text/event-stream" \
  -H "Last-Event-Id: 43" \
  https://api.amy.health/v1/turns/turn_01HX.../events

Browser (EventSource)

Native EventSource is the simplest path. It handles Last-Event-Id automatically.

const url = `https://api.amy.health/v1/turns/${turnId}/events`;

// EventSource cannot set custom headers — pass the API key as a query
// string fallback, or proxy through your own backend that adds the
// Authorization header server-side.
const es = new EventSource(`${url}?api_key=${apiKey}`);

es.addEventListener("turn.started", (e) => {
  console.log("Started:", JSON.parse(e.data));
});

es.addEventListener("agent.thought", (e) => {
  const { agent, delta } = JSON.parse(e.data);
  document.getElementById(agent).textContent += delta;
});

es.addEventListener("turn.completed", (e) => {
  const { result } = JSON.parse(e.data);
  render(result.answer);
  es.close();
});

es.addEventListener("turn.failed", (e) => {
  const { error } = JSON.parse(e.data);
  console.error(error.code, error.message);
  es.close();
});

Important: browser EventSource cannot set the Authorization header. Two options:

  1. Proxy through your backend; have it add the bearer token.
  2. Cookie auth, out of scope for v1; we don't issue cookies.

For production browser use, proxy. The SDK does this transparently in React/Vue if you set baseUrl to your own proxy.

React Native

EventSource is not in React Native's standard polyfill. Use react-native-event-source and pass it to the SDK:

import EventSource from "react-native-event-source";
import { Amy } from "@amy/sdk";

const amy = new Amy({
  apiKey,
  EventSourceCtor: EventSource,
});

for await (const event of amy.turns.stream(turnId)) {
  // works identically to Node / browser
}

react-native-event-source does support custom headers, so you don't need the proxy trick. Pass the Authorization header directly through the SDK.

Swift (URLSession)

There's no built-in EventSource in Foundation, but the bytes-stream API is enough:

let url = URL(string: "https://api.amy.health/v1/turns/\(turnId)/events")!
var req = URLRequest(url: url)
req.setValue("text/event-stream",          forHTTPHeaderField: "Accept")
req.setValue("Bearer \(apiKey)",           forHTTPHeaderField: "Authorization")
req.setValue(String(lastEventId),          forHTTPHeaderField: "Last-Event-Id")

let (bytes, _) = try await URLSession.shared.bytes(for: req)

var currentEvent: (id: String?, event: String?, data: String?) = (nil, nil, nil)
for try await line in bytes.lines {
  if line.isEmpty {
    // dispatch event
    if let type = currentEvent.event, let data = currentEvent.data {
      handle(type: type, data: Data(data.utf8))
    }
    currentEvent = (nil, nil, nil)
    continue
  }
  if line.hasPrefix("id: ")    { currentEvent.id    = String(line.dropFirst(4)) }
  if line.hasPrefix("event: ") { currentEvent.event = String(line.dropFirst(7)) }
  if line.hasPrefix("data: ")  { currentEvent.data  = String(line.dropFirst(6)) }
}

The forthcoming Swift SDK wraps this with typed handlers; until then, the snippet above is what it does under the hood.


Common mistakes

Connecting without Accept: text/event-stream

You'll get the JSON snapshot of the Turn instead of the stream. The endpoint content-negotiates: SSE if Accept matches, JSON otherwise.

Treating agent.thought delta as a full message

It's a chunk. Concatenate deltas for the same agent between agent.started and agent.completed to get the full narration. Whitespace is significant, don't trim.

Reconnecting without Last-Event-Id

You'll receive duplicate events from the start of the buffer. Always track the last id: you observed and pass it on reconnect.

Reconnecting more than 1 hour after completed_at

The replay buffer is GC'd. You'll get 404 turn_events_expired. Read the terminal state with GET /v1/turns/:id instead, it's permanent.

Splitting JSON across multiple data: lines

We don't, and your parser shouldn't expect it. Each data: line is a complete JSON document. If you see a parse error, it's a bug, report with the request ID.

Letting EventSource reconnect on its own

The browser EventSource reconnects automatically but uses a 3-second linear delay with no jitter and no cap. For a long-running turn over a flaky network, this is a bad strategy. Use the SDK (amy.turns.stream) which has exponential backoff with jitter.

Forgetting to close the stream after turn.completed

The server will close it for you, but your client should detect the terminal event (turn.completed or turn.failed) and stop processing. The SDK iterator does this; raw EventSource consumers must do it manually.

Opening one stream per UI subscriber

Open one stream per turn, then fan out events client-side. Two concurrent streams to the same turn waste resources and can rate-limit your account.

Polling GET /v1/turns/:id and streaming

Pick one. They return the same state. Polling while streaming doubles your request count and can race against the stream's terminal event.


Where to next

On this page