Workshop - Streaming agent with mid-stream tool use¶
Companion to AI Systems -> Month 05 -> Week 17-18: Serving Systems, and the eighth in the AI implementations workshop series. Every previous workshop's agent was blocking: send the prompt, wait, get the whole answer back. Real users hate that. The streaming pattern - tokens appear as the model generates them, tool calls fire while the response is still in flight - is the difference between an AI feature that feels responsive and one that feels broken. This workshop builds the streaming agent loop from raw SSE parsing all the way through mid-stream tool dispatch, and ends with the four production patterns that turn streaming from a demo into a system.
~75 minutes. Needs: Python 3.11+, httpx (with HTTP/2 support, install with httpx[http2]), an Anthropic API key. No GPU.
What you'll build, and the idea it makes concrete¶
You'll build a streaming CLI agent against the raw Anthropic Messages API SSE endpoint - no SDK streaming helpers, no framework. You'll parse the event stream by hand, accumulate partial JSON for tool_use blocks, fire tool calls as soon as their JSON completes, and print text tokens to the terminal as they arrive. By the end you'll have a working "Claude Code in a single file," and you'll know the four edge cases that distinguish a streaming demo from a production streaming agent.
The idea this makes concrete:
Streaming is not just "the same response, delivered incrementally." The wire format changes (Server-Sent Events instead of one JSON blob), the parsing changes (event-by-event accumulation instead of one
json.loads), the error handling changes (a partial response is a real state you must handle), and the agent loop's shape changes (tool calls can fire before the assistant turn is "complete"). Each of those is a real source of bugs in production code. The naive streaming code paths "just append tokens to a buffer and print" miss most of these and produce demos that break the moment the model decides to use a tool mid-response.
A second idea, equally important:
The perceived latency of an AI feature is dominated by the time to first token, not the total response time. A response that streams the first token at 200ms and finishes at 8 seconds feels dramatically faster than one that blocks for 5 seconds and returns instantly. The same total work feels different. Production AI UX optimizes for the first metric (TTFT - time to first token) because it is what users actually experience. Every workshop after this assumes streaming is the default.
Step 0: the architecture you're about to assemble¶
+----- HTTP POST /v1/messages with stream=true -----+
| |
+-----+ | +--------------------------------+ |
| user|------>| | Anthropic streaming | |
+-----+ | /v1/messages endpoint | |
+----------+---------------------+ |
| |
| SSE events (one per line, text/event-stream)
v
+-----------------------------------------------------------------+
| YOUR STREAMING AGENT |
| |
| for event in sse_parse(response): |
| match event.type: |
| message_start: reset state |
| content_block_start: open a new text or tool_use block |
| content_block_delta: append delta (text chunk OR JSON |
| chunk for the current tool_use) |
| content_block_stop: block done. If tool_use, dispatch |
| the tool NOW and queue the result |
| message_delta: update stop_reason |
| message_stop: turn complete. Loop or exit. |
+--------------------------+--------------------------------------+
|
| as tokens come in:
+--> stdout (user sees them appear)
+--> tool dispatch (concurrent with text)
+--> structured trace (Workshop 9)
Three things this diagram makes explicit:
- The event stream has structure, not just text. SSE events arrive typed -
content_block_start,content_block_delta,content_block_stop, etc. The text the user sees comes fromdeltaevents withintextcontent blocks; tool calls come fromdeltaevents withintool_usecontent blocks. Confusing the two is the most common streaming bug. - Tool calls stream as JSON deltas. Each character of the tool's input JSON arrives as a delta. You accumulate the partial JSON until the
content_block_stopevent for that tool_use block fires - then the JSON is complete and you canjson.loadsit and dispatch the tool. You cannot dispatch on a partial JSON; you also cannot wait for the entire turn before dispatching the first tool, or you lose the streaming win. stop_reasonarrives near the end. The finalmessage_deltaevent carriesstop_reason: tool_useorend_turn. You don't know what kind of turn this is until then. The agent loop's "should I run tools and continue?" decision happens atmessage_stop.
Step 1: parse SSE by hand (so you see what's actually arriving)¶
Anthropic's streaming endpoint returns text/event-stream - one event per blank-line-separated record, with event: <name> and data: <json> lines. Parse it with raw httpx:
import httpx, json, os, sys
URL = "https://api.anthropic.com/v1/messages"
HEADERS = {
"x-api-key": os.environ["ANTHROPIC_API_KEY"],
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
def stream_events(messages, tools=None):
body = {
"model": "claude-sonnet-4-6",
"max_tokens": 2048,
"messages": messages,
"stream": True,
}
if tools:
body["tools"] = tools
with httpx.stream("POST", URL, headers=HEADERS, json=body, timeout=60) as r:
event_type = None
for raw_line in r.iter_lines():
if not raw_line:
continue
if raw_line.startswith("event:"):
event_type = raw_line[6:].strip()
elif raw_line.startswith("data:"):
payload = json.loads(raw_line[5:].strip())
yield event_type, payload
That's the entire SSE parser. ~15 lines. Test it with no tools, just a question:
for event_type, payload in stream_events([{"role": "user", "content": "Count to 3"}]):
print(f"[{event_type}] {payload}")
You'll see the event sequence:
[message_start] {"type":"message_start","message":{"id":"...","role":"assistant",...}}
[content_block_start] {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
[content_block_delta] {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"1"}}
[content_block_delta] {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":", 2"}}
[content_block_delta] {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":", 3"}}
[content_block_stop] {"type":"content_block_stop","index":0}
[message_delta] {"type":"message_delta","delta":{"stop_reason":"end_turn"}}
[message_stop] {"type":"message_stop"}
That's the whole protocol. Eight event types (message_start, content_block_start, content_block_delta, content_block_stop, message_delta, message_stop, plus ping for keepalives and error for failures). Every other streaming feature is a variation on parsing these events.
Step 2: print text tokens as they arrive¶
The simplest interesting use: print to stdout as text deltas arrive. This is the entire "Claude is typing..." UX.
def stream_text(messages):
for event_type, payload in stream_events(messages):
if event_type == "content_block_delta" and payload["delta"]["type"] == "text_delta":
sys.stdout.write(payload["delta"]["text"])
sys.stdout.flush()
print() # newline after the turn
stream_text([{"role": "user", "content": "Write a haiku about TCP retransmissions."}])
Run it. Tokens stream to your terminal at roughly 80-100/second on Sonnet. Time to first token: about 250-400ms. That's the perceived-latency budget you're now hitting; the rest of the response can take 5 seconds and the user does not feel it as slow.
Step 3: handle tool_use blocks streaming as JSON deltas¶
This is where streaming gets non-trivial. When the model uses a tool, the tool_use content block streams its input field as partial JSON deltas. You cannot parse partial JSON; you must accumulate the deltas and parse the whole thing at content_block_stop.
def stream_with_tools(messages, tools, tool_fns):
"""Stream a turn, dispatch tools as their JSON completes, return
the (assistant content list, tool results list, stop reason)."""
text_blocks = [] # accumulated text content blocks
tool_blocks = [] # accumulated tool_use content blocks
partial = {} # index -> partial JSON string for tool_use blocks
block_types = {} # index -> "text" | "tool_use"
tool_meta = {} # index -> {"name": str, "id": str}
tool_results = []
stop_reason = None
for event_type, payload in stream_events(messages, tools=tools):
if event_type == "content_block_start":
idx = payload["index"]
cb = payload["content_block"]
block_types[idx] = cb["type"]
if cb["type"] == "text":
text_blocks.append({"type": "text", "text": "", "_index": idx})
elif cb["type"] == "tool_use":
partial[idx] = ""
tool_meta[idx] = {"name": cb["name"], "id": cb["id"]}
elif event_type == "content_block_delta":
idx = payload["index"]
delta = payload["delta"]
if delta["type"] == "text_delta":
sys.stdout.write(delta["text"])
sys.stdout.flush()
for b in text_blocks:
if b["_index"] == idx:
b["text"] += delta["text"]
elif delta["type"] == "input_json_delta":
partial[idx] += delta["partial_json"]
elif event_type == "content_block_stop":
idx = payload["index"]
if block_types[idx] == "tool_use":
# JSON is now complete; parse and dispatch.
args = json.loads(partial[idx]) if partial[idx] else {}
name = tool_meta[idx]["name"]
tool_id = tool_meta[idx]["id"]
tool_blocks.append({
"type": "tool_use",
"id": tool_id,
"name": name,
"input": args,
})
# Dispatch the tool *now* - while later text/tool blocks may
# still be streaming. This is the mid-stream win.
result = tool_fns[name](**args)
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_id,
"content": str(result),
})
elif event_type == "message_delta":
stop_reason = payload["delta"].get("stop_reason", stop_reason)
elif event_type == "message_stop":
break
print() # newline after the turn
assistant = [b for b in text_blocks] + tool_blocks
# Strip our internal _index field.
for b in assistant:
b.pop("_index", None)
return assistant, tool_results, stop_reason
The key line is the content_block_stop handler for tool_use: as soon as the tool's JSON is complete, dispatch it. Don't wait for the whole turn. If the model emits text-then-tool-then-text, you stream the first text, run the tool while the model is still streaming the second text, and continue. The tool dispatch and the continued streaming overlap; the user's wall-clock latency drops by the tool's execution time.
Step 4: the full streaming agent loop¶
Wrap step 3 in the same outer loop you've used in every previous workshop:
def streaming_agent(user_message, tools, tool_fns, max_turns=10):
messages = [{"role": "user", "content": user_message}]
for turn in range(max_turns):
assistant, tool_results, stop_reason = stream_with_tools(messages, tools, tool_fns)
messages.append({"role": "assistant", "content": assistant})
if stop_reason != "tool_use":
return
messages.append({"role": "user", "content": tool_results})
Run it with a real tool:
def get_weather(city: str) -> dict:
return {"city": city, "temp_c": 18, "conditions": "sunny"}
TOOLS = [{
"name": "get_weather",
"description": "Get current weather for a city.",
"input_schema": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
}]
streaming_agent(
"What's the weather in Lagos? Then write a haiku about it.",
TOOLS,
{"get_weather": get_weather},
)
Watch the terminal. The model streams "Let me check..." then the tool fires (you'll see no text for ~50ms while the tool runs in-line), then "It's 18°C and sunny..." then the haiku. You can see the structure of the model's thinking in real time - and that's the streaming UX every modern AI product is built on.
Step 5: break it (the four production-breaking edge cases)¶
5.1 The interrupted stream¶
Network drops mid-response. The SSE connection closes; iter_lines raises an exception. The agent loop has a partial assistant turn that's not in messages. If you append it as-is, the API rejects the next turn ("missing tool_result for tool_use X"). If you skip it, you lose work.
The fix: track whether the turn ended cleanly (message_stop fired) and the stop reason was set. If not, treat the turn as failed and retry from the previous user message. Don't try to "stitch" a partial turn into the conversation; it confuses the model and the API.
5.2 The ping keepalive¶
Anthropic and OpenAI both send event: ping periodically (~every 15 seconds) to keep the connection alive across proxies. Your event handler should explicitly ignore ping (or any unknown event type) rather than crash on it. The code above happens to ignore unknown events because the match only checks for the known cases; explicit handling is better.
5.3 Concurrent tool dispatch¶
If the model emits multiple tool_use blocks in one turn (Anthropic does this; OpenAI does it via parallel tool_calls), the naive code above dispatches them serially. For independent tools (no shared state), dispatch them in parallel with concurrent.futures. The pattern is exactly the same as Workshop 4's parallel tool calls, just applied inside the streaming loop.
5.4 The cancellation case¶
The user presses Ctrl-C. You want to: close the SSE connection cleanly (so you stop being billed for streamed tokens you're throwing away), abort any in-flight tool calls, and exit without poisoning the conversation. Wrap the loop in a try/except KeyboardInterrupt that closes the httpx response and exits. Production clients also send a cancelled notification through MCP for any in-flight MCP tool calls (Workshop 1's failure mode).
Step 6: production patterns¶
The four things real production streaming agents do that the workshop code doesn't:
- Backpressure to the UI. If your UI can't render tokens as fast as they arrive (rare for CLI, common for browser-rendered markdown with syntax highlighting), buffer with a bounded queue and drop or coalesce. The user can't read at 200 tokens/second anyway.
- Per-line markdown rendering. Many streamed responses are markdown. Rendering markdown incrementally as tokens arrive is tricky (you don't know whether
**is bold-start until you see the closer); the common pattern is to render finalized lines (anything followed by\n) and show in-progress lines as plain text. - Cost and usage tracking. The
message_deltaevent carries usage info at the end (input tokens, output tokens). Log it to your observability stack (Workshop 9) for cost attribution per request. - Multi-step rendering UI. When tool calls happen mid-stream, the user wants to see them, not just the text. The Claude Code CLI shows "Reading file...", "Running tool...", "Writing..." as live status; the streaming events tell you exactly when to update each. Production UX puts work into the tool-call presentation, not just the text.
Step 7: SDK helpers vs raw streaming¶
The Anthropic Python SDK provides with client.messages.stream(...) as stream: which handles SSE parsing and accumulates content for you. The OpenAI SDK has a similar streaming helper. Both are fine for production; using them skips ~30 lines of boilerplate. The reason to learn the raw protocol (as you just did) is debugging: when something weird happens in production - a tool call that didn't fire, a text block that came back malformed - you need to be able to read the event stream and know what should have happened.
Production rubric: use the SDK helpers; understand what they do.
Now extend it¶
- Add OpenAI streaming. The wire format is similar but the field names differ (deltas come through
choices[0].delta, tool calls stream asdelta.tool_calls[i].function.argumentsJSON fragments). Add aproviderflag and route the parsing accordingly. - Add streaming structured output. Combine with Workshop 6: the model's output is constrained to a schema, streaming arrives field-by-field. A "live extraction" UX is possible where extracted fields appear in a UI as they get committed.
- Add interruption. Let the user type while the model is streaming, signaling an interrupt that closes the current stream and starts a new turn with the user's new message prepended. The Claude Desktop and ChatGPT UIs do this; the implementation is fiddly but valuable.
- Add typing indicators. Show "Claude is thinking..." between the user's enter-keypress and the first text token. After the first token, the indicator goes away and tokens stream. Small UX detail, big perceived-quality win.
- Build a web UI. Wrap the streaming agent in a FastAPI endpoint that returns Server-Sent Events to the browser. The browser's
EventSourceAPI consumes them with a few lines of JavaScript. Now you have the ChatGPT/Claude UI - 200 lines of Python + 50 of JS.
What you might wonder¶
"Why SSE and not WebSockets?" SSE is one-directional (server to client) and runs over plain HTTP, which means it works through every proxy, CDN, and load balancer without special config. WebSockets are bidirectional but require ws:// or wss:// support throughout the chain. For LLM streaming the data flow is one-way (user request, then long response stream), so SSE is the simpler and more reliable choice. Both Anthropic and OpenAI standardized on SSE.
"What happens to tool results during streaming?" Tool results don't stream - the tool runs synchronously inside your client, and the result is appended to messages as a complete blob for the next turn. Only the model's output streams. If your tool itself is slow (a 5-second database query), the user sees no model tokens during that 5 seconds; the streaming pauses. The mitigation is to use fast tools, parallel tool dispatch, or to return progress updates from the tool (MCP's progress notifications are designed for this).
"How do I show 'thinking' or chain-of-thought before the answer?" Two patterns. (1) System prompt that says "Think step by step in <thinking> tags, then write your final answer." The model emits the thinking content as regular text; you parse the tag in your renderer and show it in a different style. (2) Claude's extended-thinking models (Sonnet 4.5 Thinking, Opus thinking) have a separate thinking content block type that arrives as its own stream of deltas - render it as a "Reasoning..." panel that collapses when the final answer starts. The second is the cleaner production pattern.
"What's the latency budget for a 'good' streaming UX?" Time to first token (TTFT) under ~500ms feels instant. 500ms-2s feels deliberate but acceptable. Above 2s feels broken. Token rate after first token should be at least ~30 tokens/sec for plain text to feel like the model is "thinking"; 50+ is comfortable; 100+ feels fast. Modern hosted APIs (Anthropic, OpenAI) hit ~80-150 tokens/sec on flagship models, with TTFT around 250-500ms.
"How does this compose with multi-agent (Workshop 7)?" Each specialist's call can stream. The supervisor doesn't see the specialists' streaming directly - it gets the completed result. But you can show specialist progress to the user via a different mechanism: the supervisor emits status events ("Researching..." / "Writing..." / "Reviewing...") and the UI renders each as a progress card. The token-by-token stream is per-call; the workflow-level UX is separate.
What this gave you¶
- You parsed the Anthropic streaming SSE protocol by hand and saw the seven event types.
- You built a streaming text printer that shows the model thinking in real time.
- You built mid-stream tool dispatch - tools fire as their JSON completes, not at end-of-turn.
- You saw the full agent loop in streaming form (~80 lines) and connected it to the kernel from Workshop 4.
- You know the four production-breaking edge cases (interrupted stream, ping keepalives, concurrent tools, cancellation) and how to handle each.
- You understand the production rubric: use the SDK helpers but know what they do under them, because debugging requires reading the wire.
The bigger transfer: streaming is the default modality for AI UX, and the wire format has real structure. Once you've seen the event stream, the gap between "make it stream" and "make it stream correctly" is no longer a black box.
Next: Workshop 9 - Agent observability with OpenTelemetry, where every span of every agent run is queryable in a trace UI - cost, latency, and tool calls per request.
Submit your build¶
When you finish this workshop, share what you built so others can see and learn from your work. Include:
- Public repo with the raw-SSE streaming agent (no SDK)
- Terminal recording showing tokens streaming in real time with a mid-stream tool call
- Time-to-first-token measurement on your hardware against Claude Sonnet
- Short note (3 to 5 sentences) on which production edge case you found hardest to handle
Submit your build Request feedback on your output Discuss this workshop