How wearable + lab data gets from Terra into D1, what runs along the way, and how to recover when something falls over. The pipeline is small, webhook → raw_events insert → Queue → normalize → typed…
How wearable + lab data gets from Terra into D1, what runs along the way,
and how to recover when something falls over. The pipeline is small,
webhook → raw_events insert → Queue → normalize → typed tables, but the
details of dedup, idempotency, and the cron drain are load-bearing.
The Cloudflare layer never asks Terra for data on demand for an agent
turn. All data lives in D1, pre-normalized, ready for GET /v1/sync
(cloud/src/routes/sync.ts) which the
CLI uses to mirror it into a local SQLite for the Data Science Agent's
pandas. The ingest pipeline below is what keeps that D1 fresh.
Mounted at four paths in cloud/src/index.ts:46–49
because Terra's dashboard Host field is hostname-only and the team has set
it to different paths over time:
app.route("/webhook/terra", webhookTerra);app.route("/webhook", webhookTerra);app.route("/terra", webhookTerra);app.post("/", handleTerraWebhook); // root POST also accepted
Terra's lab-report webhook is structurally distinct from its wearable
webhooks: no type, no user, just { upload_id, data: [...] }. The
detection is shape-based because Terra's actual payloads diverged from the
documented shape and the implementation now tracks reality (see the
verbose comment header in
normalize/lab.ts).
insert into raw_events (event_type, terra_user_id, reference_id, provider, payload, signature_verified, dedup_key)values (?, ?, ?, ?, ?, 1, ?)returning id
With dedup_key = sha256Hex(rawBody)
(cloud/src/lib/sha256.ts). The
UNIQUE (event_type, terra_user_id, dedup_key) index on raw_events
makes duplicate deliveries a no-op, the catch branch returns
200 { ok: true, duplicate: true } and skips enqueue.
For lab-report webhooks the reference_id column is repurposed to hold
Terra's upload_id (Terra echoes no user info for labs; the upload_id is
the only way to correlate back to lab_uploads.terra_upload_id).
The HTTP response returns immediately (200). The queue publish runs
under waitUntil, so the Worker isolate stays alive until it resolves
without blocking the response. If send throws (transient Cloudflare
hiccup), the cron drain catches the row within 5 minutes, see
Cron jobs below.
The request_id echoed back is the same one threading through every log
line and the Queue message, so a single Terra delivery's full journey is
queryable via /admin/traces?request_id=<uuid>.
Normalized but skipped on purpose (large_request_processing, nutrition, etc.)
datetime('now')
'skipped:<reason>'
Acknowledged, not stored.
Returned ok: false (handler decision)
NULL
<error>
Parked, won't auto-retry. Manual triage via /admin/dlq.
Threw an exception
NULL (unless updated by retry)
<exception message>
Will be redelivered by queue up to 5 times → DLQ.
Important: result.ok=false returns msg.ack() (the message is consumed),
but the row is intentionally left unmarked (processed_at IS NULL) so the
5-minute cron drain re-enqueues it. This gives a "park, but check again
soon" behaviour for transient issues like the no_connection_for_terra_user
case (an event landed before its auth_success arrived).
The normalize SQL throughout uses INSERT ... ON CONFLICT(...) DO UPDATE SET ... (see buildCoalesceUpsert).
That makes the entire pipeline replay-safe: re-running normalize on the
same raw_events row produces the same final D1 state.
The COALESCE pattern (only overwrites destination columns when the new
value is non-null) lets a sleep event fill in HRV after a daily event
already wrote steps for the same date, without clobbering the daily
fields with nulls.
Every wearable event arrives with terra_user_id (Terra's per-provider
opaque id). To find our user_id (Clerk sub):
// cloud/src/normalize/utils.tsexport async function resolveUserId(db, referenceId, terraUserId) { if (referenceId && referenceId.startsWith("user_")) return referenceId; if (!terraUserId) return null; const row = await db.prepare( "select user_id from terra_connections where terra_user_id = ? limit 1" ).bind(terraUserId).first(); return row?.user_id ?? null;}
Two paths:
reference_id is set and looks like a Clerk userId, fast path,
no DB hit. This is the happy path for events that arrived AFTER
auth_success (we set reference_id at widget creation, see
routes/connect.ts:32–42).
Fall back to looking up terra_user_id in terra_connections,
the slow path, but reliable once the auth_success has been processed.
If neither resolves, the normalizer returns
{ ok: false, skipped: "no_user", error: "no_connection_for_terra_user" }.
The cron drain will re-enqueue it for up to 24h, hoping the
auth_success lands in the meantime. After 24h the cron explicitly skips
these rows (its WHERE clause filters out
process_error = 'no_connection_for_terra_user' after the lookback
window).
sleep_sessions, the full raw payload, one row per session
(PK user_id, source, start_time).
daily_summary, sleep-derived columns merged into the day
keyed to the session's wake date (utcDate(metadata.end_time)).
The merge uses buildCoalesceUpsert, so daily_summary gets sleep_minutes,
bed_time, wake_up_time, resting_heart_rate (preferred from sleep over
daily), heart_rate_variability, the deep/rem/light/awake minute and
percent breakdowns, spo2, respiratory_rate, skin_temperature, plus
sleep_score and recovery_score (the latter falls back to
readiness_data.readiness for Whoop).
Then backfill90Days(env, terraUserId) fires off four Terra HTTP calls
(activity / sleep / daily / body) in parallel. Each returns synchronously
with a request_processing ack; chunks land back via webhook over the
next minutes.
(no other fields used today; draw_date field placeholder in comments
but not parsed)
Worker steps:
Validate, mime + extension + size. Reject:
400 expected_multipart if formData parse fails.
400 missing_file_field if no file.
415 unsupported_file_type if neither mime nor extension matches.
413 file_too_large if size > 10 * 1024 * 1024.
R2 put, lab-uploads/<userId>/<uploadId>.<ext> where uploadId = crypto.randomUUID(). httpMetadata.contentType and
customMetadata.{user_id, original_name} persisted.
D1 insert into lab_uploads (id, user_id, storage_key, terra_status='pending').
Forward to Terra:
POST https://api.tryterra.co/v2/lab-reportsdev-id: <TERRA_DEV_ID>x-api-key: <TERRA_API_KEY>form fields: reference_id=<userId>, files=<blob> ← plural "files"
The plural-vs-singular field name matters: Terra rejects file with
400 No Files Provided.
Update D1 with the result:
update lab_uploads set terra_status = ?, -- 'submitted' | 'failed:<status>' | 'failed:network' terra_response = ?, -- raw JSON terra_upload_id = ? -- the id Terra returned (the ONLY where id = ? -- correlator on the async webhook)
Respond, 200 { ok, upload_id, storage_key, terra_status, note: "Run amy sync in ~30s to pull biomarkers." }. On failure
502 terra_upload_failed.
if (event.event_type === "lab_report") { const terraUploadId = event.reference_id ?? payload.upload_id; // ↑ webhook handler stuffed Terra's upload_id into reference_id since // lab payloads have no user object const row = await env.DB.prepare( "select user_id from lab_uploads where terra_upload_id = ? limit 1" ).bind(terraUploadId).first(); if (!row?.user_id) return { ok: false, skipped: "no_user", error: "no_lab_upload_for_upload_id" }; await normalizeLab(env, row.user_id, payload); await env.DB.prepare( "update lab_uploads set terra_status = 'parsed', parsed_at = datetime('now') where terra_upload_id = ?" ).bind(terraUploadId).run();}
normalizeLab iterates data[].results[] and writes to biomarkers_raw
with source='terra_lab'. Each marker is mapped through CODE_ALIASES
(e.g. Terra's glucose_fasting → canonical glucose) so unmapped codes
still land at full fidelity in the long table but aren't surfaced by the
biomarkers_wide view.
A classification like normal/borderline/high/etc. gets mapped to
the tri-state status enum (optimal | borderline | out_of_range).
Terra streams historical ranges (>28 days) over multiple webhook
deliveries. The flow:
CLI: POST /v1/import { days: 365 }.
Worker
(cloud/src/routes/import.ts):
reconciles connections via Terra listSubscriptions, then for each
active connection × each type (activity, sleep, daily, body) calls
requestBackfill which hits GET /v2/<type>?user_id=...&start_date=... &end_date=...&to_webhook=true&with_samples=false.
Terra responds synchronously with request_processing ack and
begins streaming chunks via webhook.
Some chunks come as large_request_processing envelope events that
the normalizer explicitly skips ({ ok: true, skipped: 'large_request_processing' }).
The real data arrives as daily | sleep | activity | body events
directly.
Days are capped at 1460 (4 years) by routes/import.ts:35:
Whoop/Oura/Garmin typically only retain 2-3 years server-side, but
Terra may surface more for some providers.
The auth_success normalizer also fires a 90-day backfill automatically
on first connect.
select id from raw_events where processed_at is null and received_at > datetime('now', '-24 hours') and (process_error is null or process_error not like 'skipped:%') and (process_error is null or process_error != 'no_connection_for_terra_user') order by id asc limit 50
Re-enqueues up to 50 rows. Filters:
24h lookback, older stuck rows stay parked for manual triage.
Excludes skipped:%, those were intentionally not processed (e.g.
nutrition); re-enqueuing would just skip them again.
Excludes no_connection_for_terra_user, these need a different
fix (the missing auth_success to land first); spamming the queue
doesn't help.
If ids.length === 0: logs cron(5m): nothing to drain and returns.
The Cloudflare-level DLQ is terra-events-dlq, set in wrangler.toml,
no consumer attached. A message lands there only if the consumer threw
5 times in a row on the same message body.
There's also an in-D1 DLQ of sorts: rows in raw_events where the
normalizer returned { ok: false } (parked, not retried). The cron will
only pick these up if they're within the 24h lookback window and not
flagged as skipped:*.
Returns the last 50 rows where (processed_at IS NULL AND process_error IS NOT NULL) OR (process_error IS NOT NULL AND NOT LIKE 'skipped:%'),
with the error message truncated to 200 chars. Pair with
/admin/raw-events/:id to dump the full Terra payload for repro.
Cloudflare dashboard → Queues → terra-events-dlq → Messages. There's no
admin CLI today; if anything lands here, it's a deeper failure (consumer
code threw 5 times in a row).
# 1. Find the IDscurl -sH "x-admin-key: $AMY_ADMIN_KEY" \ https://api.amy.health/admin/dlq | jq '.rows[].id'# 2. Drain everything not-yet-processed (re-enqueues up to 100 at once)curl -X POST -H "x-admin-key: $AMY_ADMIN_KEY" \ https://api.amy.health/admin/drain
The /admin/drain route (cloud/src/routes/admin.ts:81–92)
selects up to 100 rows with processed_at IS NULL (no other filter, be
aware it will re-pick skipped:* and no_connection_for_terra_user rows
too).
There's no first-class admin endpoint for "re-enqueue row X" today. The
workaround is wrangler queues consumer ... or directly inserting via the
producer binding from a one-off script:
# Force a full backfill of the last 365 days for the authed usercurl -X POST \ -H "Authorization: Bearer $AMY_JWT" \ -H "Content-Type: application/json" \ -d '{"days": 365}' \ https://api.amy.health/v1/import
This hits POST /v1/import → requestBackfill for each connection × each
type. Chunks stream back into the normal webhook ingest path.
curl -X POST -H "x-admin-key: $AMY_ADMIN_KEY" \ https://api.amy.health/admin/user/<userId>/wipe
Deletes from raw_events, trace_events, daily_summary, activities,
sleep_sessions, biomarkers_raw, lab_uploads, terra_connections,
and users (in that order, respecting FK cascade). Returns a per-table
deletion count.
R2 objects (lab PDFs) are NOT deleted by /admin/user/:userId/wipe; the
wipe deletes the metadata row but leaves the blob. If you need full
parity, list the prefix lab-uploads/<userId>/ and call bucket.delete
in a separate sweep.