Skip to content

Streaming Contract

This document is the protocol narrative for the Integration API’s streaming surface. The normative machine-readable definition is the ConversationEvent schema (and its per-event variants) in openapi/openapi.yaml; this document explains how to operate and consume the protocol correctly — what flows over the wire, in what order, what can go wrong, and exactly what a client must do about it.

Two operations produce event streams:

OperationMethod & pathWhen it streams
createMessagePOST /conversations/{conversation_id}/messagesAlways, unless ?stream=false (§8)
createConversationPOST /conversationsOnly when the body includes initial_message; the just-created conversation rides inside the first message_start event (data.conversation)

Everything below applies identically to both. Where behavior differs, it is called out.

1. Transport: NDJSON over a plain HTTP response

Section titled “1. Transport: NDJSON over a plain HTTP response”

The streaming response is newline-delimited JSON (NDJSON):

  • Status: 200 with Content-Type: application/x-ndjson.
  • Framing: one complete JSON object per line, each line terminated by \n. No enclosing array, no comma separators, no SSE data: prefixes — each line parses standalone with any JSON parser.
  • No Content-Length. The response length is unknown when headers are sent. Delivery uses Transfer-Encoding: chunked on HTTP/1.1 or ordinary DATA frames on HTTP/2 — the client must not expect a length header and must not wait for the connection to close before processing.
  • Incremental flush. The server flushes each event line as it is produced. A content_delta reaches the client within milliseconds of the agent producing it — if nothing in between buffers it (see the operator warnings below).
  • Chunk boundaries are not line boundaries. TCP/HTTP chunking is oblivious to the protocol: a single read may deliver half a line, three lines, or one line plus the beginning of the next. Clients MUST buffer bytes and split on \n themselves (§6); never assume one read = one event.
  • Empty lines. Clients SHOULD skip empty lines silently rather than treating them as a parse error. The protocol does not currently emit them, but tolerating them is free and keeps the parser robust against future keep-alive padding.
  • Encoding: UTF-8, always.

Pre-stream failures are not events. Authentication failures, validation errors, an archived conversation, a suspended tenant, or on_capacity: reject hitting a full pool are all rejected before any NDJSON byte is written — as ordinary RFC 9457 application/problem+json responses with their proper HTTP status (401/403/404/409/422/429). A client therefore branches on the response’s Content-Type: application/x-ndjson → consume the stream; application/problem+json → handle the problem. Once the 200 and the NDJSON content type are committed, any subsequent failure can only arrive in-band as a terminal error event (§3) — the HTTP status is already sent and will not change.

Warnings for adapter operators: keep the pipe unbuffered

Section titled “Warnings for adapter operators: keep the pipe unbuffered”

The single most common integration failure is not a protocol bug — it is an intermediary that buffers the response and turns a live stream into a dead wait followed by one giant flush. Every hop between the shiftagent install and the end consumer (adapter, host gateway, ingress, service mesh, CDN) must be audited:

IntermediaryFailure modeRemedy
nginx (reverse proxy)proxy_buffering on (the default) holds the body until complete or buffer-fullproxy_buffering off; for the streaming routes, or have the origin send X-Accel-Buffering: no; also disable proxy_cache on these routes
Envoy / service mesh sidecarsResponse buffering filters, aggressive stream_idle_timeoutExempt the streaming routes from buffering filters; raise the idle timeout (see below)
Cloud / hardware load balancersIdle-connection timeouts (often 60 s) silently kill quiet streamsRaise the idle timeout on the streaming path to at least the deployment’s approval-expiry window (§5.4), or accept truncation and rely on the reconcile path (§7)
Response compression (gzip/brotli)Compressors buffer input to build blocks — deltas arrive late or all at onceDo not compress application/x-ndjson responses, or use a streaming-flush compressor configuration
The adapter itselfReading the whole upstream body before forwarding (await res.text(), framework “body parsing” middleware)Pipe bytes through as they arrive; exempt streaming routes from any body-collection middleware
Client-side HTTP libraries”Convenience” APIs that resolve only on full bodyUse the streaming read API (ReadableStream, chunked iterators) — never the buffered one

Two timing facts drive timeout budgets:

  1. Streams can be legitimately quiet for a long time. A hold capacity wait emits queued events only as the queue state changes, and a parked approval (§5.4) may produce no events at all between approval_required and resumed — a window bounded only by the approval’s expires_at. Idle timeouts must accommodate this, or the operator must consciously accept truncation on quiet streams and lean on the reconcile path (§7). Both are valid designs; pick one deliberately.
  2. Truncation is survivable by design. If any hop kills the connection, the run continues server-side and the outcome lands in history regardless (§7). Buffering, by contrast, is silent degradation — everything still “works,” just uselessly late. Prioritize fixing buffering over fixing timeouts.

Every line is one ConversationEvent — a discriminated union on type over a common envelope (ConversationEventBase in the spec):

FieldTypeMeaning
object"conversation.event"Constant type marker (house convention: every object says what it is)
typestringEvent discriminator — drives the client’s switch (§3)
conversation_idcon_…The conversation this stream belongs to
message_idmsg_… | nullThe assistant message being produced. null only on queued events, which are emitted before the run (and thus the message) exists
seqinteger ≥ 0Per-response monotonic counter, starts at 0, no gaps (§4)
created_atISO 8601 UTCServer-side emission time
dataobjectEvent-type-specific payload (§3)

Events are not stored — they are the live wire format only. The durable record is the persisted Message (delivered inside the terminal message_end and always retrievable via listMessages, GET /conversations/{conversation_id}/messages).

Seven event types. Two are terminal; the rest are progress.

typeTerminaldata payloadEmitted when
queuedno{position, retry_hint_seconds?}position in the hold queue (1 = next), retry_hint_seconds an estimated waitOnly under on_capacity: "hold" while no sandbox is available, before message_start. May repeat as the queue drains. Bounded by the deployment’s max hold time (see getCapacity, GET /capacitymax_hold_seconds); exceeding it terminates the stream with a capacity-exhausted error event. message_id is null here
message_startno{role: "assistant", conversation?}conversation present only on streams initiated by createConversation with initial_message (the just-created Conversation object)The assistant message has opened; a message_id is now assigned and deltas follow
content_deltano{text, filler?} — a chunk of assistant output; filler: true marks low-latency filler outputRepeatedly, as the agent produces output. Filler-flagged deltas (when the filler cascade enables them) bridge dead air while the main agent works; hosts may render or suppress them. Filler text is not part of the persisted message — concatenating only non-filler deltas reproduces the final content
approval_requirednoThe full Approval object (apr_…), including reason, requested_items[], and expires_atThe agent raised a human-in-the-loop gate mid-run. The message parks in status: "awaiting_approval"; the stream stays open awaiting resolution via approveApproval (POST /approvals/{approval_id}/approve) or denyApproval (POST /approvals/{approval_id}/deny)
resumedno{approval_id, decision: "approved"}A pending approval was granted; the parked run continues toward message_end. Denials never produce resumed — they terminate with error
message_endyes{message} — the full persisted assistant Message (final content, parts, status: "completed", usage, …)The run finished successfully. This is the authoritative final state — identical to what listMessages returns
erroryesAn RFC 9457 problem object — same registry as HTTP errors (type, title, status, detail, request_id, …)The run failed in-band: agent error, approval denied or expired, hold-queue timeout (capacity-exhausted), etc. The message ends status: "failed"

Forward compatibility rule: clients MUST ignore event types they do not recognize (skip the line, keep consuming seq) while still honoring the terminal-event contract. New event types may be added without a version bump; message_end and error remain the only terminal events.

4. Ordering, seq, and the terminal-event rule

Section titled “4. Ordering, seq, and the terminal-event rule”

The legal event grammar for one stream:

queued* message_start content_delta* ( approval_required resumed content_delta* )* message_end
| ( approval_required → denied/expired ) error
stateDiagram-v2
    direction LR
    [*] --> Queued : on_capacity=hold,\npool full
    [*] --> Started : sandbox available
    Queued --> Queued : queued (repeats)
    Queued --> Started : sandbox freed
    Queued --> Errored : max hold exceeded\n(capacity-exhausted)
    Started --> Streaming : message_start
    Streaming --> Streaming : content_delta\n(filler or real)
    Streaming --> Parked : approval_required
    Parked --> Streaming : resumed (approved)
    Parked --> Errored : denied / expired
    Streaming --> Done : message_end
    Streaming --> Errored : error
    Done --> [*]
    Errored --> [*]

    note right of Parked
        message.status = awaiting_approval
        stream stays open, possibly silent,
        until resolution or expires_at
    end note

Three invariants make the stream verifiable:

  1. Monotonic seq, no gaps. seq starts at 0 for the first event of each response and increments by exactly 1 per event. A received gap (seq jumps) means an intermediary dropped or reordered data — treat the stream as corrupt/truncated and reconcile (§7). In practice a gap should be impossible over TCP; checking it is cheap insurance against broken middleboxes.
  2. Exactly one terminal event. Every complete stream ends with exactly one message_end or exactly one error — never both, never neither, never anything after it.
  3. Closed-without-terminal ⇒ truncated. If the connection closes (EOF, reset, timeout, abort) and the client has not seen a terminal event, the client MUST treat the stream as truncated. Truncation is not failure: the run continues server-side and its outcome lands in history regardless. The mandatory follow-up is the reconcile procedure in §7 — never retry-send the same message on truncation alone (you would create a duplicate user message; if you must retry, that is what the Idempotency-Key header on createMessage is for).

seq is per-response, not per-conversation: a later createMessage on the same conversation starts again at 0. An approval park does not split the response — resumed and everything after it continue the same connection and the same seq sequence (see §5.4).

All transcripts below are real wire format — each line is exactly what arrives on the socket (shown pretty-spaced here only in the commentary). IDs and timestamps are illustrative.

The simplest case: pooled conversation, filler disabled, no capacity pressure, no approvals. Request:

POST /conversations/con_01hzx8conv001/messages
Authorization: Bearer sk_int_...
Content-Type: application/json
{"content": "Summarize today's open jobs."}

Response — 200, Content-Type: application/x-ndjson:

{"object":"conversation.event","type":"message_start","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst001","seq":0,"data":{"role":"assistant"},"created_at":"2026-07-02T10:00:01Z"}
{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst001","seq":1,"data":{"text":"You have three open jobs today: "},"created_at":"2026-07-02T10:00:03Z"}
{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst001","seq":2,"data":{"text":"two installations and one repair visit."},"created_at":"2026-07-02T10:00:03Z"}
{"object":"conversation.event","type":"message_end","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst001","seq":3,"data":{"message":{"object":"message","id":"msg_01hzx8asst001","conversation_id":"con_01hzx8conv001","role":"assistant","content":"You have three open jobs today: two installations and one repair visit.","parts":[{"type":"text","text":"You have three open jobs today: two installations and one repair visit."}],"repository_id":null,"skill_ids":null,"env":null,"status":"completed","usage":{"input_tokens":1830,"output_tokens":24},"created_at":"2026-07-02T10:00:04Z"}},"created_at":"2026-07-02T10:00:04Z"}

Notes:

  • seq runs 0→3 with no gaps; exactly one terminal event (message_end).
  • Concatenating the content_delta texts reproduces message_end.data.message.content exactly (this holds whenever no filler deltas are present — see 5.2).
  • The persisted message in message_end is byte-for-byte what listMessages will return for msg_01hzx8asst001 — a client that renders deltas live needs nothing further, and a client that only wants final state can ignore deltas and read the terminal event alone.

Same request shape, but the filler cascade (tenant settings → conversation → message; most specific wins) resolves to enabled — e.g. the conversation was created with filler: {enabled: true}. The filler agent bridges the dead air before the main agent’s first token:

{"object":"conversation.event","type":"message_start","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst002","seq":0,"data":{"role":"assistant"},"created_at":"2026-07-02T10:05:00Z"}
{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst002","seq":1,"data":{"text":"One moment while I pull that up — ","filler":true},"created_at":"2026-07-02T10:05:00Z"}
{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst002","seq":2,"data":{"text":"checking the schedule now. ","filler":true},"created_at":"2026-07-02T10:05:01Z"}
{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst002","seq":3,"data":{"text":"Your next appointment is at 14:00."},"created_at":"2026-07-02T10:05:03Z"}
{"object":"conversation.event","type":"message_end","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst002","seq":4,"data":{"message":{"object":"message","id":"msg_01hzx8asst002","conversation_id":"con_01hzx8conv001","role":"assistant","content":"Your next appointment is at 14:00.","parts":[{"type":"text","text":"Your next appointment is at 14:00."}],"repository_id":null,"skill_ids":null,"env":null,"status":"completed","usage":{"input_tokens":1912,"output_tokens":11},"created_at":"2026-07-02T10:05:04Z"}},"created_at":"2026-07-02T10:05:04Z"}

Notes:

  • Filler deltas (seq 1–2) carry data.filler: true; the real reply (seq 3) does not (the field defaults to false and may be omitted).
  • The persisted content contains only the non-filler text. Filler is ephemeral UX lubrication — a client that replays history from listMessages will never see it. Clients therefore have two correct rendering strategies: (a) render filler deltas in a transient style and replace them when real content arrives, or (b) suppress filler: true deltas entirely. What is incorrect is appending filler and real text into one permanent transcript — it will disagree with history.
  • Voice-style hosts typically want filler on (dead air is costly); text chat hosts often suppress it. That is exactly why the knob cascades per tenant/conversation/message.

The message was sent with on_capacity: "hold" while the sandbox pool was exhausted. (With the default "reject" there would be no stream at all — the request fails pre-stream with 429 capacity-exhausted + Retry-After, as an application/problem+json body.)

{"object":"conversation.event","type":"queued","conversation_id":"con_01hzx8conv001","message_id":null,"seq":0,"data":{"position":2,"retry_hint_seconds":15},"created_at":"2026-07-02T10:10:00Z"}
{"object":"conversation.event","type":"queued","conversation_id":"con_01hzx8conv001","message_id":null,"seq":1,"data":{"position":1,"retry_hint_seconds":5},"created_at":"2026-07-02T10:10:10Z"}
{"object":"conversation.event","type":"message_start","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst003","seq":2,"data":{"role":"assistant"},"created_at":"2026-07-02T10:10:14Z"}
{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst003","seq":3,"data":{"text":"Done — the report is ready."},"created_at":"2026-07-02T10:10:16Z"}
{"object":"conversation.event","type":"message_end","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst003","seq":4,"data":{"message":{"object":"message","id":"msg_01hzx8asst003","conversation_id":"con_01hzx8conv001","role":"assistant","content":"Done — the report is ready.","parts":[{"type":"text","text":"Done — the report is ready."}],"repository_id":null,"skill_ids":null,"env":null,"status":"completed","usage":{"input_tokens":1744,"output_tokens":9},"created_at":"2026-07-02T10:10:17Z"}},"created_at":"2026-07-02T10:10:17Z"}

Notes:

  • queued events precede message_start and are the only events where message_id is null — the run (and therefore the assistant message) does not exist yet.
  • queued may repeat as the position improves; treat each as a UI update (“You’re next…”), not as an accumulating list. retry_hint_seconds is an estimate, not a promise.
  • The hold is bounded by the deployment’s maximum hold time (getCapacitymax_hold_seconds). If it elapses first, the stream ends instead with a terminal error event whose problem type slug is capacity-exhausted — in-band, because the 200 header was already committed when the hold began.
  • Adapters that want to avoid holds altogether can pre-check getCapacity (GET /capacity{warm_available, sticky_active, at_capacity, …}) and shed or defer load at their own edge.

5.4 HITL pause and resume — with the approve call in between

Section titled “5.4 HITL pause and resume — with the approve call in between”

The agent hits a gate mid-run (here: it needs a credential that is not on file). One stream, one connection, one seq sequence — but a separate, out-of-band HTTP call resolves the approval while the stream is parked.

The stream (connection 1) — first half:

{"object":"conversation.event","type":"message_start","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","seq":0,"data":{"role":"assistant"},"created_at":"2026-07-02T10:00:01Z"}
{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","seq":1,"data":{"text":"Starting the reconciliation. "},"created_at":"2026-07-02T10:00:03Z"}
{"object":"conversation.event","type":"approval_required","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","seq":2,"data":{"object":"approval","id":"apr_01hzx8appr001","tenant_id":"tnt_01hzx8acme001","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","status":"pending","reason":"The CRM lookup requires a credential that is not on file for this conversation.","requested_items":[{"kind":"secret","description":"API key for the CRM system","alias":"CRM_API_KEY"}],"expires_at":"2026-07-02T10:15:00Z","resolved_by":null,"resolved_at":null,"created_at":"2026-07-02T10:00:05Z","updated_at":"2026-07-02T10:00:05Z"},"created_at":"2026-07-02T10:00:05Z"}

The message is now status: "awaiting_approval"; the stream stays open and silent. The host surfaces the approval to a human (the adapter transports the event; the approval authority — which holds the approver key — decides).

The approve call (connection 2) — a normal request/response, made by whoever holds the approver key, any time before expires_at:

POST /approvals/apr_01hzx8appr001/approve
Authorization: Bearer sk_int_...
Content-Type: application/json
{
"signature": {
"key_id": "apk_01hzx8host001",
"algorithm": "hmac-sha256",
"exp": 1782813720,
"value": "sig-base64url-example-aGVsbG8td29ybGQ"
},
"secrets": { "CRM_API_KEY": "example-value-vaulted-never-echoed" },
"note": "Approved by supervisor on duty."
}
→ 200 {"object":"approval","id":"apr_01hzx8appr001","status":"approved", …}

The signed assertion is minted with the per-tenant approver key — cryptographically distinct from the sk_int_ service key that authenticates the transport (see approveApproval in the spec for the exact signing contract, and getIntegrationSelf, GET /integration/self, for discovering registered approver_keys). An invalid signature gets 403 approval-signature-invalid and the approval stays pending; the parked stream does not move. The supplied secret is vaulted write-only, conversation-scoped.

The stream (connection 1, still the same response) — second half, resuming the same seq sequence:

{"object":"conversation.event","type":"resumed","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","seq":3,"data":{"approval_id":"apr_01hzx8appr001","decision":"approved"},"created_at":"2026-07-02T10:02:10Z"}
{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","seq":4,"data":{"text":"Reconciled 14 invoices against the CRM using {{secret:CRM_API_KEY}}."},"created_at":"2026-07-02T10:02:19Z"}
{"object":"conversation.event","type":"message_end","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst004","seq":5,"data":{"message":{"object":"message","id":"msg_01hzx8asst004","conversation_id":"con_01hzx8conv001","role":"assistant","content":"Reconciled 14 invoices against the CRM using {{secret:CRM_API_KEY}}.","parts":[{"type":"text","text":"Reconciled 14 invoices against the CRM using {{secret:CRM_API_KEY}}."}],"repository_id":null,"skill_ids":null,"env":null,"status":"completed","usage":{"input_tokens":2048,"output_tokens":96},"created_at":"2026-07-02T10:02:20Z"}},"created_at":"2026-07-02T10:02:20Z"}

Notes:

  • The secret value never appears in the stream — not in resumed, not in any delta, not in the persisted message. The agent references it strictly by alias ({{secret:CRM_API_KEY}}); the egress proxy resolves the alias at the network boundary. This is the approval⇄vault weave: the agent asks by alias, the approver supplies by alias, the run resumes seeing only the alias.
  • On deny (denyApproval, POST /approvals/{approval_id}/deny, same signature contract with "decision":"deny") or on expiry (expires_at passes unresolved), there is no resumed — the stream terminates with an error event (problem slug approval-expired for expiry) and the message finishes status: "failed".
  • Sandbox behavior while parked differs by placement mode but is protocol-invisible: sticky sandboxes stay warm until the approval’s expires_at; pooled runs checkpoint and re-hydrate on approval. Same events either way.
  • A client that would rather not hold a connection open across a potentially long human decision can deliberately disconnect after approval_required and reconcile later (§7) — the run parks server-side identically either way.
sequenceDiagram
    autonumber
    participant Host as Host system UI
    participant Adapter as Adapter (stateless)
    participant API as Integration API
    participant Agent as Sandboxed agent run
    participant Authority as Host approval authority<br/>(holds approver key)

    Adapter->>API: POST /conversations/{id}/messages (createMessage)
    API->>Agent: start run
    API-->>Adapter: 200 application/x-ndjson (stream opens)
    API-->>Adapter: message_start · content_delta …
    Agent->>API: raises approval (needs CRM_API_KEY)
    API-->>Adapter: approval_required (Approval apr_…)
    Note over API,Agent: message parks awaiting_approval<br/>stream stays open (silent)
    Adapter->>Host: surface approval request
    Host->>Authority: human decision
    Authority->>Authority: mint signed assertion<br/>{approval_id, decision, exp}
    Authority->>Adapter: signed assertion (+ secret value)
    Adapter->>API: POST /approvals/{id}/approve (approveApproval)
    API->>API: verify signature · vault secret (write-only)
    API->>Agent: resume with alias only
    API-->>Adapter: resumed · content_delta … · message_end

A load balancer with a 60-second idle timeout kills the connection while the agent is mid-task. What the client received:

{"object":"conversation.event","type":"message_start","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst005","seq":0,"data":{"role":"assistant"},"created_at":"2026-07-02T10:20:01Z"}
{"object":"conversation.event","type":"content_delta","conversation_id":"con_01hzx8conv001","message_id":"msg_01hzx8asst005","seq":1,"data":{"text":"Working through the price book now"},"created_at":"2026-07-02T10:20:04Z"}
<-- TCP connection closed: no message_end, no error -->

The client’s obligations, in order:

  1. Classify: connection closed without a terminal event ⇒ truncated (invariant 3, §4). The same classification applies to a seq gap or a line that cuts off mid-JSON at EOF (discard the partial line).
  2. Do not re-send the message. The run is still executing server-side. Re-sending creates a duplicate user message and a second run. (If your delivery pipeline requires at-least-once retries, send the original with an Idempotency-Key so the retry replays instead of duplicating.)
  3. Reconcile (§7): poll listMessages for msg_01hzx8asst005 until its status leaves in_progress — the full final content arrives in history exactly as it would have arrived in message_end. If the status is awaiting_approval, also check listApprovals (GET /approvals?conversation_id=…&status=pending) and surface the pending gate.

There is deliberately no re-attach endpoint in this surface: a truncated stream cannot be resumed mid-flight; the durable record is the reconciliation mechanism. This keeps the protocol stateless for the caller and makes “adapter restarted mid-stream” a non-event.

The reference consumption pattern, as framework-neutral TypeScript pseudocode. The essentials: byte buffer → line splitter → JSON.parse → switch on type, with seq verification and terminal tracking.

async function streamMessage(
conversationId: string,
body: MessageCreate,
): Promise<Message> {
const res = await fetch(
`${BASE_URL}/conversations/${conversationId}/messages`,
{
method: 'POST',
headers: {
authorization: `Bearer ${SK_INT_KEY}`,
'content-type': 'application/json',
// 'idempotency-key': crypto.randomUUID(), // if your pipeline retries
},
body: JSON.stringify(body),
},
);
// Pre-stream failures are plain problem+json — not events.
const contentType = res.headers.get('content-type') ?? '';
if (!contentType.includes('application/x-ndjson')) {
const problem = await res.json(); // RFC 9457 problem object
throw new IntegrationApiError(res.status, problem); // honor Retry-After on 429
}
const reader = res.body!.getReader();
const decoder = new TextDecoder(); // UTF-8, streaming mode
let buffer = '';
let expectedSeq = 0;
let terminal: ConversationEvent | null = null;
const handle = (event: ConversationEvent) => {
if (event.seq !== expectedSeq) throw new TruncatedStreamError('seq gap');
expectedSeq += 1;
switch (event.type) {
case 'queued':
ui.showQueued(event.data.position, event.data.retry_hint_seconds);
break;
case 'message_start':
// createConversation-with-initial_message: the new conversation
// rides in event.data.conversation — capture its id here.
ui.openAssistantBubble(event.message_id!);
break;
case 'content_delta':
if (event.data.filler) ui.showTransient(event.data.text); // or: suppress
else ui.append(event.message_id!, event.data.text);
break;
case 'approval_required':
ui.surfaceApproval(event.data); // full Approval object (apr_…)
break; // stream stays open; resolution is out-of-band
case 'resumed':
ui.clearApproval(event.data.approval_id);
break;
case 'message_end': // terminal — authoritative persisted Message
case 'error': // terminal — RFC 9457 problem object
terminal = event;
break;
default:
// Forward compatibility: unknown types are skipped, seq still counts.
break;
}
};
try {
for (;;) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Chunk boundaries ≠ line boundaries: keep the trailing partial line.
const lines = buffer.split('\n');
buffer = lines.pop()!;
for (const line of lines) {
if (line.trim() === '') continue; // tolerate blank lines
handle(JSON.parse(line) as ConversationEvent);
}
}
// EOF: a leftover partial line means the last event was cut off mid-JSON.
if (buffer.trim() !== '') throw new TruncatedStreamError('partial final line');
} catch (err) {
// Network error / abort / seq gap / partial line → reconcile, never re-send.
return reconcile(conversationId);
}
if (terminal === null) return reconcile(conversationId); // closed w/o terminal
if (terminal.type === 'error') throw new AgentRunError(terminal.data);
return terminal.data.message; // message_end — the persisted assistant Message
}

And the reconcile path — the only correct reaction to truncation:

async function reconcile(conversationId: string): Promise<Message> {
// The run continues server-side; its outcome lands in history regardless.
for (;;) {
// listMessages — GET /conversations/{conversation_id}/messages
const last = await getLatestAssistantMessage(conversationId);
switch (last?.status) {
case 'completed':
return last;
case 'failed':
throw new AgentRunError(/* inspect message + listApprovals for cause */);
case 'awaiting_approval':
// listApprovals — GET /approvals?conversation_id=…&status=pending
await surfacePendingApprovals(conversationId);
break; // keep polling until resolved or expired
case 'in_progress':
default:
break; // still running
}
await sleep(POLL_INTERVAL_MS); // e.g. 2 s with capped backoff
}
}

Implementation notes:

  • Never await res.json() / res.text() on the streaming response — that is the buffered API and defeats the protocol. Use the streaming reader.
  • decoder.decode(value, { stream: true }) matters: a multi-byte UTF-8 character can straddle a chunk boundary, and streaming mode holds the partial code point instead of corrupting it.
  • The same loop consumes createConversation-with-initial_message streams unchanged — the only addition is capturing message_start.data.conversation.
  • For UI cancellation, abort the fetch (AbortController). Aborting the stream does not abort the run — the assistant message still lands in history; reconcile if you later need it.

Summarizing the recovery contract in one place:

SituationClient action
Stream closed with message_endDone. data.message is authoritative; no further calls needed
Stream closed with errorRun failed; handle the problem object (same registry as HTTP errors). The message is in history with status: "failed"
Stream closed without a terminal event, seq gap, or partial final lineTruncated. Poll listMessages (GET /conversations/{conversation_id}/messages) until the assistant message’s status leaves in_progress/awaiting_approval. Do not re-send
Client saw approval_required, then disconnected (deliberately or not)The run stays parked server-side until resolution or expires_at. Track the gate via listApprovals (GET /approvals?conversation_id=…&status=pending) / getApproval (GET /approvals/{approval_id}); resolve via approveApproval/denyApproval. Then reconcile the message via listMessages
Pre-stream 429 capacity-exhausted (on_capacity: reject)Back off per Retry-After, optionally consult getCapacity (GET /capacity), and re-send — nothing was created, so a plain retry is safe

Because the user message and the assistant outcome are always durably recorded, the streaming layer carries zero exclusive state — any consumer can crash at any byte and recover from the REST surface alone.

8. The non-streaming alternative: ?stream=false

Section titled “8. The non-streaming alternative: ?stream=false”

createMessage accepts ?stream=false for callers that cannot (or need not) consume NDJSON — server-to-server automations, batch backfills, constrained runtimes:

POST /conversations/{conversation_id}/messages?stream=false

Semantics:

  • The request blocks until the run completes, then returns 201 with the completed assistant Message as plain application/json — the same object message_end would have carried.
  • Failures return their natural HTTP status with a problem body (no error event — there is no stream). Capacity rejection is the usual pre-stream 429 capacity-exhausted.
  • Everything stream-specific disappears. No queued progress (a hold wait happens silently inside the blocking call), no deltas, and no filler output — filler exists only as flagged deltas on the wire and is never part of the persisted message.
  • Weigh it against HITL and holds. The blocking call spans the entire run — including any capacity hold and any awaiting_approval park. A run gated on a human decision can block for the whole approval window, which most HTTP clients and intermediaries will time out long before. Rule of thumb: ?stream=false is fine for short, approval-free runs; anything that may raise an approval or queue for capacity should stream (or accept the timeout-and-reconcile pattern of §7, which works identically here — the run continues server-side if the blocking call times out).
  • The Idempotency-Key header is honored identically, and is more important here: a timed-out blocking call is indistinguishable from a lost response, and the key turns a blind retry into a safe replay.

createConversation has no stream parameter: without initial_message it is already a plain 201 JSON response; with initial_message it always streams. To get “create + first message” without streaming, make two calls — createConversation (no initial_message), then createMessage?stream=false.