Streaming Contract
This document is the protocol narrative for the Integration API’s streaming surface. The
normative machine-readable definition is the ConversationEvent schema (and its per-event
variants) in openapi/openapi.yaml; this document explains how to operate and consume the
protocol correctly — what flows over the wire, in what order, what can go wrong, and exactly what
a client must do about it.
Two operations produce event streams:
| Operation | Method & path | When it streams |
|---|---|---|
createMessage | POST /conversations/{conversation_id}/messages | Always, unless ?stream=false (§8) |
createConversation | POST /conversations | Only when the body includes initial_message; the just-created conversation rides inside the first message_start event (data.conversation) |
Everything below applies identically to both. Where behavior differs, it is called out.
1. Transport: NDJSON over a plain HTTP response
Section titled “1. Transport: NDJSON over a plain HTTP response”The streaming response is newline-delimited JSON (NDJSON):
- Status:
200withContent-Type: application/x-ndjson. - Framing: one complete JSON object per line, each line terminated by
\n. No enclosing array, no comma separators, no SSEdata:prefixes — each line parses standalone with any JSON parser. - No
Content-Length. The response length is unknown when headers are sent. Delivery usesTransfer-Encoding: chunkedon HTTP/1.1 or ordinary DATA frames on HTTP/2 — the client must not expect a length header and must not wait for the connection to close before processing. - Incremental flush. The server flushes each event line as it is produced. A
content_deltareaches the client within milliseconds of the agent producing it — if nothing in between buffers it (see the operator warnings below). - Chunk boundaries are not line boundaries. TCP/HTTP chunking is oblivious to the protocol:
a single read may deliver half a line, three lines, or one line plus the beginning of the next.
Clients MUST buffer bytes and split on
\nthemselves (§6); never assume one read = one event. - Empty lines. Clients SHOULD skip empty lines silently rather than treating them as a parse error. The protocol does not currently emit them, but tolerating them is free and keeps the parser robust against future keep-alive padding.
- Encoding: UTF-8, always.
Pre-stream failures are not events. Authentication failures, validation errors, an archived
conversation, a suspended tenant, or on_capacity: reject hitting a full pool are all rejected
before any NDJSON byte is written — as ordinary RFC 9457 application/problem+json responses
with their proper HTTP status (401/403/404/409/422/429). A client therefore branches
on the response’s Content-Type: application/x-ndjson → consume the stream;
application/problem+json → handle the problem. Once the 200 and the NDJSON content type are
committed, any subsequent failure can only arrive in-band as a terminal error event (§3) —
the HTTP status is already sent and will not change.
Warnings for adapter operators: keep the pipe unbuffered
Section titled “Warnings for adapter operators: keep the pipe unbuffered”The single most common integration failure is not a protocol bug — it is an intermediary that buffers the response and turns a live stream into a dead wait followed by one giant flush. Every hop between the shiftagent install and the end consumer (adapter, host gateway, ingress, service mesh, CDN) must be audited:
| Intermediary | Failure mode | Remedy |
|---|---|---|
| nginx (reverse proxy) | proxy_buffering on (the default) holds the body until complete or buffer-full | proxy_buffering off; for the streaming routes, or have the origin send X-Accel-Buffering: no; also disable proxy_cache on these routes |
| Envoy / service mesh sidecars | Response buffering filters, aggressive stream_idle_timeout | Exempt the streaming routes from buffering filters; raise the idle timeout (see below) |
| Cloud / hardware load balancers | Idle-connection timeouts (often 60 s) silently kill quiet streams | Raise the idle timeout on the streaming path to at least the deployment’s approval-expiry window (§5.4), or accept truncation and rely on the reconcile path (§7) |
| Response compression (gzip/brotli) | Compressors buffer input to build blocks — deltas arrive late or all at once | Do not compress application/x-ndjson responses, or use a streaming-flush compressor configuration |
| The adapter itself | Reading the whole upstream body before forwarding (await res.text(), framework “body parsing” middleware) | Pipe bytes through as they arrive; exempt streaming routes from any body-collection middleware |
| Client-side HTTP libraries | ”Convenience” APIs that resolve only on full body | Use the streaming read API (ReadableStream, chunked iterators) — never the buffered one |
Two timing facts drive timeout budgets:
- Streams can be legitimately quiet for a long time. A
holdcapacity wait emitsqueuedevents only as the queue state changes, and a parked approval (§5.4) may produce no events at all betweenapproval_requiredandresumed— a window bounded only by the approval’sexpires_at. Idle timeouts must accommodate this, or the operator must consciously accept truncation on quiet streams and lean on the reconcile path (§7). Both are valid designs; pick one deliberately. - Truncation is survivable by design. If any hop kills the connection, the run continues server-side and the outcome lands in history regardless (§7). Buffering, by contrast, is silent degradation — everything still “works,” just uselessly late. Prioritize fixing buffering over fixing timeouts.
2. The event envelope
Section titled “2. The event envelope”Every line is one ConversationEvent — a discriminated union on type over a common envelope
(ConversationEventBase in the spec):
| Field | Type | Meaning |
|---|---|---|
object | "conversation.event" | Constant type marker (house convention: every object says what it is) |
type | string | Event discriminator — drives the client’s switch (§3) |
conversation_id | con_… | The conversation this stream belongs to |
message_id | msg_… | null | The assistant message being produced. null only on queued events, which are emitted before the run (and thus the message) exists |
seq | integer ≥ 0 | Per-response monotonic counter, starts at 0, no gaps (§4) |
created_at | ISO 8601 UTC | Server-side emission time |
data | object | Event-type-specific payload (§3) |
Events are not stored — they are the live wire format only. The durable record is the
persisted Message (delivered inside the terminal message_end and always retrievable via
listMessages, GET /conversations/{conversation_id}/messages).
3. Event vocabulary
Section titled “3. Event vocabulary”Seven event types. Two are terminal; the rest are progress.
type | Terminal | data payload | Emitted when |
|---|---|---|---|
queued | no | {position, retry_hint_seconds?} — position in the hold queue (1 = next), retry_hint_seconds an estimated wait | Only under on_capacity: "hold" while no sandbox is available, before message_start. May repeat as the queue drains. Bounded by the deployment’s max hold time (see getCapacity, GET /capacity → max_hold_seconds); exceeding it terminates the stream with a capacity-exhausted error event. message_id is null here |
message_start | no | {role: "assistant", conversation?} — conversation present only on streams initiated by createConversation with initial_message (the just-created Conversation object) | The assistant message has opened; a message_id is now assigned and deltas follow |
content_delta | no | {text, filler?} — a chunk of assistant output; filler: true marks low-latency filler output | Repeatedly, as the agent produces output. Filler-flagged deltas (when the filler cascade enables them) bridge dead air while the main agent works; hosts may render or suppress them. Filler text is not part of the persisted message — concatenating only non-filler deltas reproduces the final content |
approval_required | no | The full Approval object (apr_…), including reason, requested_items[], and expires_at | The agent raised a human-in-the-loop gate mid-run. The message parks in status: "awaiting_approval"; the stream stays open awaiting resolution via approveApproval (POST /approvals/{approval_id}/approve) or denyApproval (POST /approvals/{approval_id}/deny) |
resumed | no | {approval_id, decision: "approved"} | A pending approval was granted; the parked run continues toward message_end. Denials never produce resumed — they terminate with error |
message_end | yes | {message} — the full persisted assistant Message (final content, parts, status: "completed", usage, …) | The run finished successfully. This is the authoritative final state — identical to what listMessages returns |
error | yes | An RFC 9457 problem object — same registry as HTTP errors (type, title, status, detail, request_id, …) | The run failed in-band: agent error, approval denied or expired, hold-queue timeout (capacity-exhausted), etc. The message ends status: "failed" |
Forward compatibility rule: clients MUST ignore event types they do not recognize (skip the
line, keep consuming seq) while still honoring the terminal-event contract. New event types may
be added without a version bump; message_end and error remain the only terminal events.
4. Ordering, seq, and the terminal-event rule
Section titled “4. Ordering, seq, and the terminal-event rule”The legal event grammar for one stream:
queued* message_start content_delta* ( approval_required resumed content_delta* )* message_end | ( approval_required → denied/expired ) errorstateDiagram-v2
direction LR
[*] --> Queued : on_capacity=hold,\npool full
[*] --> Started : sandbox available
Queued --> Queued : queued (repeats)
Queued --> Started : sandbox freed
Queued --> Errored : max hold exceeded\n(capacity-exhausted)
Started --> Streaming : message_start
Streaming --> Streaming : content_delta\n(filler or real)
Streaming --> Parked : approval_required
Parked --> Streaming : resumed (approved)
Parked --> Errored : denied / expired
Streaming --> Done : message_end
Streaming --> Errored : error
Done --> [*]
Errored --> [*]
note right of Parked
message.status = awaiting_approval
stream stays open, possibly silent,
until resolution or expires_at
end note
Three invariants make the stream verifiable:
- Monotonic
seq, no gaps.seqstarts at0for the first event of each response and increments by exactly 1 per event. A received gap (seqjumps) means an intermediary dropped or reordered data — treat the stream as corrupt/truncated and reconcile (§7). In practice a gap should be impossible over TCP; checking it is cheap insurance against broken middleboxes. - Exactly one terminal event. Every complete stream ends with exactly one
message_endor exactly oneerror— never both, never neither, never anything after it. - Closed-without-terminal ⇒ truncated. If the connection closes (EOF, reset, timeout,
abort) and the client has not seen a terminal event, the client MUST treat the stream as
truncated. Truncation is not failure: the run continues server-side and its outcome lands
in history regardless. The mandatory follow-up is the reconcile procedure in §7 — never
retry-send the same message on truncation alone (you would create a duplicate user message;
if you must retry, that is what the
Idempotency-Keyheader oncreateMessageis for).
seq is per-response, not per-conversation: a later createMessage on the same conversation
starts again at 0. An approval park does not split the response — resumed and everything
after it continue the same connection and the same seq sequence (see §5.4).
5. Annotated example streams
Section titled “5. Annotated example streams”All transcripts below are real wire format — each line is exactly what arrives on the socket (shown pretty-spaced here only in the commentary). IDs and timestamps are illustrative.
5.1 Plain reply
Section titled “5.1 Plain reply”The simplest case: pooled conversation, filler disabled, no capacity pressure, no approvals. Request:
POST /conversations/con_01hzx8conv001/messagesAuthorization: Bearer sk_int_...Content-Type: application/json
{"content": "Summarize today's open jobs."}Response — 200, Content-Type: application/x-ndjson:
{"object":"conversation.event","type":"message_start","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst001","seq":0,"data":{"role":"assistant"},"created_at":"2026-07-02T10:00:01Z"}{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst001","seq":1,"data":{"text":"You have three open jobs today: "},"created_at":"2026-07-02T10:00:03Z"}{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst001","seq":2,"data":{"text":"two installations and one repair visit."},"created_at":"2026-07-02T10:00:03Z"}{"object":"conversation.event","type":"message_end","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst001","seq":3,"data":{"message":{"object":"message","id":"msg_01hzx8asst001","conversation_id":"con_01hzx8conv001","role":"assistant","content":"You have three open jobs today: two installations and one repair visit.","parts":[{"type":"text","text":"You have three open jobs today: two installations and one repair visit."}],"repository_id":null,"skill_ids":null,"env":null,"status":"completed","usage":{"input_tokens":1830,"output_tokens":24},"created_at":"2026-07-02T10:00:04Z"}},"created_at":"2026-07-02T10:00:04Z"}Notes:
seqruns 0→3 with no gaps; exactly one terminal event (message_end).- Concatenating the
content_deltatexts reproducesmessage_end.data.message.contentexactly (this holds whenever no filler deltas are present — see 5.2). - The persisted message in
message_endis byte-for-byte whatlistMessageswill return formsg_01hzx8asst001— a client that renders deltas live needs nothing further, and a client that only wants final state can ignore deltas and read the terminal event alone.
5.2 Filler-enabled reply
Section titled “5.2 Filler-enabled reply”Same request shape, but the filler cascade (tenant settings → conversation → message; most
specific wins) resolves to enabled — e.g. the conversation was created with
filler: {enabled: true}. The filler agent bridges the dead air before the main agent’s first
token:
{"object":"conversation.event","type":"message_start","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst002","seq":0,"data":{"role":"assistant"},"created_at":"2026-07-02T10:05:00Z"}{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst002","seq":1,"data":{"text":"One moment while I pull that up — ","filler":true},"created_at":"2026-07-02T10:05:00Z"}{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst002","seq":2,"data":{"text":"checking the schedule now. ","filler":true},"created_at":"2026-07-02T10:05:01Z"}{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst002","seq":3,"data":{"text":"Your next appointment is at 14:00."},"created_at":"2026-07-02T10:05:03Z"}{"object":"conversation.event","type":"message_end","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst002","seq":4,"data":{"message":{"object":"message","id":"msg_01hzx8asst002","conversation_id":"con_01hzx8conv001","role":"assistant","content":"Your next appointment is at 14:00.","parts":[{"type":"text","text":"Your next appointment is at 14:00."}],"repository_id":null,"skill_ids":null,"env":null,"status":"completed","usage":{"input_tokens":1912,"output_tokens":11},"created_at":"2026-07-02T10:05:04Z"}},"created_at":"2026-07-02T10:05:04Z"}Notes:
- Filler deltas (
seq1–2) carrydata.filler: true; the real reply (seq3) does not (the field defaults tofalseand may be omitted). - The persisted
contentcontains only the non-filler text. Filler is ephemeral UX lubrication — a client that replays history fromlistMessageswill never see it. Clients therefore have two correct rendering strategies: (a) render filler deltas in a transient style and replace them when real content arrives, or (b) suppressfiller: truedeltas entirely. What is incorrect is appending filler and real text into one permanent transcript — it will disagree with history. - Voice-style hosts typically want filler on (dead air is costly); text chat hosts often suppress it. That is exactly why the knob cascades per tenant/conversation/message.
5.3 Capacity hold
Section titled “5.3 Capacity hold”The message was sent with on_capacity: "hold" while the sandbox pool was exhausted. (With the
default "reject" there would be no stream at all — the request fails pre-stream with 429
capacity-exhausted + Retry-After, as an application/problem+json body.)
{"object":"conversation.event","type":"queued","conversation_id":"con_01hzx8conv001","message_id":null,"seq":0,"data":{"position":2,"retry_hint_seconds":15},"created_at":"2026-07-02T10:10:00Z"}{"object":"conversation.event","type":"queued","conversation_id":"con_01hzx8conv001","message_id":null,"seq":1,"data":{"position":1,"retry_hint_seconds":5},"created_at":"2026-07-02T10:10:10Z"}{"object":"conversation.event","type":"message_start","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst003","seq":2,"data":{"role":"assistant"},"created_at":"2026-07-02T10:10:14Z"}{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst003","seq":3,"data":{"text":"Done — the report is ready."},"created_at":"2026-07-02T10:10:16Z"}{"object":"conversation.event","type":"message_end","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst003","seq":4,"data":{"message":{"object":"message","id":"msg_01hzx8asst003","conversation_id":"con_01hzx8conv001","role":"assistant","content":"Done — the report is ready.","parts":[{"type":"text","text":"Done — the report is ready."}],"repository_id":null,"skill_ids":null,"env":null,"status":"completed","usage":{"input_tokens":1744,"output_tokens":9},"created_at":"2026-07-02T10:10:17Z"}},"created_at":"2026-07-02T10:10:17Z"}Notes:
queuedevents precedemessage_startand are the only events wheremessage_idisnull— the run (and therefore the assistant message) does not exist yet.queuedmay repeat as the position improves; treat each as a UI update (“You’re next…”), not as an accumulating list.retry_hint_secondsis an estimate, not a promise.- The hold is bounded by the deployment’s maximum hold time (
getCapacity→max_hold_seconds). If it elapses first, the stream ends instead with a terminalerrorevent whose problemtypeslug iscapacity-exhausted— in-band, because the200header was already committed when the hold began. - Adapters that want to avoid holds altogether can pre-check
getCapacity(GET /capacity→{warm_available, sticky_active, at_capacity, …}) and shed or defer load at their own edge.
5.4 HITL pause and resume — with the approve call in between
Section titled “5.4 HITL pause and resume — with the approve call in between”The agent hits a gate mid-run (here: it needs a credential that is not on file). One stream, one
connection, one seq sequence — but a separate, out-of-band HTTP call resolves the approval
while the stream is parked.
The stream (connection 1) — first half:
{"object":"conversation.event","type":"message_start","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","seq":0,"data":{"role":"assistant"},"created_at":"2026-07-02T10:00:01Z"}{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","seq":1,"data":{"text":"Starting the reconciliation. "},"created_at":"2026-07-02T10:00:03Z"}{"object":"conversation.event","type":"approval_required","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","seq":2,"data":{"object":"approval","id":"apr_01hzx8appr001","tenant_id":"tnt_01hzx8acme001","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","status":"pending","reason":"The CRM lookup requires a credential that is not on file for this conversation.","requested_items":[{"kind":"secret","description":"API key for the CRM system","alias":"CRM_API_KEY"}],"expires_at":"2026-07-02T10:15:00Z","resolved_by":null,"resolved_at":null,"created_at":"2026-07-02T10:00:05Z","updated_at":"2026-07-02T10:00:05Z"},"created_at":"2026-07-02T10:00:05Z"}The message is now status: "awaiting_approval"; the stream stays open and silent. The host
surfaces the approval to a human (the adapter transports the event; the approval authority —
which holds the approver key — decides).
The approve call (connection 2) — a normal request/response, made by whoever holds the
approver key, any time before expires_at:
POST /approvals/apr_01hzx8appr001/approveAuthorization: Bearer sk_int_...Content-Type: application/json
{ "signature": { "key_id": "apk_01hzx8host001", "algorithm": "hmac-sha256", "exp": 1782813720, "value": "sig-base64url-example-aGVsbG8td29ybGQ" }, "secrets": { "CRM_API_KEY": "example-value-vaulted-never-echoed" }, "note": "Approved by supervisor on duty."}
→ 200 {"object":"approval","id":"apr_01hzx8appr001","status":"approved", …}The signed assertion is minted with the per-tenant approver key — cryptographically distinct
from the sk_int_ service key that authenticates the transport (see approveApproval in the
spec for the exact signing contract, and getIntegrationSelf, GET /integration/self, for
discovering registered approver_keys). An invalid signature gets 403
approval-signature-invalid and the approval stays pending; the parked stream does not move.
The supplied secret is vaulted write-only, conversation-scoped.
The stream (connection 1, still the same response) — second half, resuming the same seq
sequence:
{"object":"conversation.event","type":"resumed","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","seq":3,"data":{"approval_id":"apr_01hzx8appr001","decision":"approved"},"created_at":"2026-07-02T10:02:10Z"}{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","seq":4,"data":{"text":"Reconciled 14 invoices against the CRM using {{secret:CRM_API_KEY}}."},"created_at":"2026-07-02T10:02:19Z"}{"object":"conversation.event","type":"message_end","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","seq":5,"data":{"message":{"object":"message","id":"msg_01hzx8asst004","conversation_id":"con_01hzx8conv001","role":"assistant","content":"Reconciled 14 invoices against the CRM using {{secret:CRM_API_KEY}}.","parts":[{"type":"text","text":"Reconciled 14 invoices against the CRM using {{secret:CRM_API_KEY}}."}],"repository_id":null,"skill_ids":null,"env":null,"status":"completed","usage":{"input_tokens":2048,"output_tokens":96},"created_at":"2026-07-02T10:02:20Z"}},"created_at":"2026-07-02T10:02:20Z"}Notes:
- The secret value never appears in the stream — not in
resumed, not in any delta, not in the persisted message. The agent references it strictly by alias ({{secret:CRM_API_KEY}}); the egress proxy resolves the alias at the network boundary. This is the approval⇄vault weave: the agent asks by alias, the approver supplies by alias, the run resumes seeing only the alias. - On deny (
denyApproval,POST /approvals/{approval_id}/deny, same signature contract with"decision":"deny") or on expiry (expires_atpasses unresolved), there is noresumed— the stream terminates with anerrorevent (problem slugapproval-expiredfor expiry) and the message finishesstatus: "failed". - Sandbox behavior while parked differs by placement mode but is protocol-invisible: sticky
sandboxes stay warm until the approval’s
expires_at; pooled runs checkpoint and re-hydrate on approval. Same events either way. - A client that would rather not hold a connection open across a potentially long human decision
can deliberately disconnect after
approval_requiredand reconcile later (§7) — the run parks server-side identically either way.
sequenceDiagram
autonumber
participant Host as Host system UI
participant Adapter as Adapter (stateless)
participant API as Integration API
participant Agent as Sandboxed agent run
participant Authority as Host approval authority<br/>(holds approver key)
Adapter->>API: POST /conversations/{id}/messages (createMessage)
API->>Agent: start run
API-->>Adapter: 200 application/x-ndjson (stream opens)
API-->>Adapter: message_start · content_delta …
Agent->>API: raises approval (needs CRM_API_KEY)
API-->>Adapter: approval_required (Approval apr_…)
Note over API,Agent: message parks awaiting_approval<br/>stream stays open (silent)
Adapter->>Host: surface approval request
Host->>Authority: human decision
Authority->>Authority: mint signed assertion<br/>{approval_id, decision, exp}
Authority->>Adapter: signed assertion (+ secret value)
Adapter->>API: POST /approvals/{id}/approve (approveApproval)
API->>API: verify signature · vault secret (write-only)
API->>Agent: resume with alias only
API-->>Adapter: resumed · content_delta … · message_end
5.5 Stream interruption (truncation)
Section titled “5.5 Stream interruption (truncation)”A load balancer with a 60-second idle timeout kills the connection while the agent is mid-task. What the client received:
{"object":"conversation.event","type":"message_start","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst005","seq":0,"data":{"role":"assistant"},"created_at":"2026-07-02T10:20:01Z"}{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst005","seq":1,"data":{"text":"Working through the price book now"},"created_at":"2026-07-02T10:20:04Z"}<-- TCP connection closed: no message_end, no error -->The client’s obligations, in order:
- Classify: connection closed without a terminal event ⇒ truncated (invariant 3, §4).
The same classification applies to a
seqgap or a line that cuts off mid-JSON at EOF (discard the partial line). - Do not re-send the message. The run is still executing server-side. Re-sending creates a
duplicate user message and a second run. (If your delivery pipeline requires at-least-once
retries, send the original with an
Idempotency-Keyso the retry replays instead of duplicating.) - Reconcile (§7): poll
listMessagesformsg_01hzx8asst005until itsstatusleavesin_progress— the full final content arrives in history exactly as it would have arrived inmessage_end. If the status isawaiting_approval, also checklistApprovals(GET /approvals?conversation_id=…&status=pending) and surface the pending gate.
There is deliberately no re-attach endpoint in this surface: a truncated stream cannot be resumed mid-flight; the durable record is the reconciliation mechanism. This keeps the protocol stateless for the caller and makes “adapter restarted mid-stream” a non-event.
6. Client parsing recipe
Section titled “6. Client parsing recipe”The reference consumption pattern, as framework-neutral TypeScript pseudocode. The essentials:
byte buffer → line splitter → JSON.parse → switch on type, with seq verification and
terminal tracking.
async function streamMessage( conversationId: string, body: MessageCreate,): Promise<Message> { const res = await fetch( `${BASE_URL}/conversations/${conversationId}/messages`, { method: 'POST', headers: { authorization: `Bearer ${SK_INT_KEY}`, 'content-type': 'application/json', // 'idempotency-key': crypto.randomUUID(), // if your pipeline retries }, body: JSON.stringify(body), }, );
// Pre-stream failures are plain problem+json — not events. const contentType = res.headers.get('content-type') ?? ''; if (!contentType.includes('application/x-ndjson')) { const problem = await res.json(); // RFC 9457 problem object throw new IntegrationApiError(res.status, problem); // honor Retry-After on 429 }
const reader = res.body!.getReader(); const decoder = new TextDecoder(); // UTF-8, streaming mode let buffer = ''; let expectedSeq = 0; let terminal: ConversationEvent | null = null;
const handle = (event: ConversationEvent) => { if (event.seq !== expectedSeq) throw new TruncatedStreamError('seq gap'); expectedSeq += 1;
switch (event.type) { case 'queued': ui.showQueued(event.data.position, event.data.retry_hint_seconds); break; case 'message_start': // createConversation-with-initial_message: the new conversation // rides in event.data.conversation — capture its id here. ui.openAssistantBubble(event.message_id!); break; case 'content_delta': if (event.data.filler) ui.showTransient(event.data.text); // or: suppress else ui.append(event.message_id!, event.data.text); break; case 'approval_required': ui.surfaceApproval(event.data); // full Approval object (apr_…) break; // stream stays open; resolution is out-of-band case 'resumed': ui.clearApproval(event.data.approval_id); break; case 'message_end': // terminal — authoritative persisted Message case 'error': // terminal — RFC 9457 problem object terminal = event; break; default: // Forward compatibility: unknown types are skipped, seq still counts. break; } };
try { for (;;) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); // Chunk boundaries ≠ line boundaries: keep the trailing partial line. const lines = buffer.split('\n'); buffer = lines.pop()!; for (const line of lines) { if (line.trim() === '') continue; // tolerate blank lines handle(JSON.parse(line) as ConversationEvent); } } // EOF: a leftover partial line means the last event was cut off mid-JSON. if (buffer.trim() !== '') throw new TruncatedStreamError('partial final line'); } catch (err) { // Network error / abort / seq gap / partial line → reconcile, never re-send. return reconcile(conversationId); }
if (terminal === null) return reconcile(conversationId); // closed w/o terminal if (terminal.type === 'error') throw new AgentRunError(terminal.data); return terminal.data.message; // message_end — the persisted assistant Message}And the reconcile path — the only correct reaction to truncation:
async function reconcile(conversationId: string): Promise<Message> { // The run continues server-side; its outcome lands in history regardless. for (;;) { // listMessages — GET /conversations/{conversation_id}/messages const last = await getLatestAssistantMessage(conversationId); switch (last?.status) { case 'completed': return last; case 'failed': throw new AgentRunError(/* inspect message + listApprovals for cause */); case 'awaiting_approval': // listApprovals — GET /approvals?conversation_id=…&status=pending await surfacePendingApprovals(conversationId); break; // keep polling until resolved or expired case 'in_progress': default: break; // still running } await sleep(POLL_INTERVAL_MS); // e.g. 2 s with capped backoff }}Implementation notes:
- Never
await res.json()/res.text()on the streaming response — that is the buffered API and defeats the protocol. Use the streaming reader. decoder.decode(value, { stream: true })matters: a multi-byte UTF-8 character can straddle a chunk boundary, and streaming mode holds the partial code point instead of corrupting it.- The same loop consumes
createConversation-with-initial_messagestreams unchanged — the only addition is capturingmessage_start.data.conversation. - For UI cancellation, abort the fetch (
AbortController). Aborting the stream does not abort the run — the assistant message still lands in history; reconcile if you later need it.
7. Reconnect & reconcile guidance
Section titled “7. Reconnect & reconcile guidance”Summarizing the recovery contract in one place:
| Situation | Client action |
|---|---|
Stream closed with message_end | Done. data.message is authoritative; no further calls needed |
Stream closed with error | Run failed; handle the problem object (same registry as HTTP errors). The message is in history with status: "failed" |
Stream closed without a terminal event, seq gap, or partial final line | Truncated. Poll listMessages (GET /conversations/{conversation_id}/messages) until the assistant message’s status leaves in_progress/awaiting_approval. Do not re-send |
Client saw approval_required, then disconnected (deliberately or not) | The run stays parked server-side until resolution or expires_at. Track the gate via listApprovals (GET /approvals?conversation_id=…&status=pending) / getApproval (GET /approvals/{approval_id}); resolve via approveApproval/denyApproval. Then reconcile the message via listMessages |
Pre-stream 429 capacity-exhausted (on_capacity: reject) | Back off per Retry-After, optionally consult getCapacity (GET /capacity), and re-send — nothing was created, so a plain retry is safe |
Because the user message and the assistant outcome are always durably recorded, the streaming layer carries zero exclusive state — any consumer can crash at any byte and recover from the REST surface alone.
8. The non-streaming alternative: ?stream=false
Section titled “8. The non-streaming alternative: ?stream=false”createMessage accepts ?stream=false for callers that cannot (or need not) consume NDJSON —
server-to-server automations, batch backfills, constrained runtimes:
POST /conversations/{conversation_id}/messages?stream=falseSemantics:
- The request blocks until the run completes, then returns
201with the completed assistantMessageas plainapplication/json— the same objectmessage_endwould have carried. - Failures return their natural HTTP status with a problem body (no
errorevent — there is no stream). Capacity rejection is the usual pre-stream429capacity-exhausted. - Everything stream-specific disappears. No
queuedprogress (aholdwait happens silently inside the blocking call), no deltas, and no filler output — filler exists only as flagged deltas on the wire and is never part of the persisted message. - Weigh it against HITL and holds. The blocking call spans the entire run — including any
capacity hold and any
awaiting_approvalpark. A run gated on a human decision can block for the whole approval window, which most HTTP clients and intermediaries will time out long before. Rule of thumb:?stream=falseis fine for short, approval-free runs; anything that may raise an approval or queue for capacity should stream (or accept the timeout-and-reconcile pattern of §7, which works identically here — the run continues server-side if the blocking call times out). - The
Idempotency-Keyheader is honored identically, and is more important here: a timed-out blocking call is indistinguishable from a lost response, and the key turns a blind retry into a safe replay.
createConversation has no stream parameter: without initial_message it is already a plain
201 JSON response; with initial_message it always streams. To get “create + first message”
without streaming, make two calls — createConversation (no initial_message), then
createMessage?stream=false.