Saltar a contenido

Deep Dive 05-LLM Application Patterns

A self-contained reference for the patterns an applied AI engineer wires together every day: message lists, sampling parameters, structured outputs, tool use, streaming, prompt caching, retries, orchestration, and the production scaffolding that turns a clever prompt into a service. Every concept is derived from the underlying mechanism so you can reason about new SDK versions without re-reading their docs.


0. Orientation: what is an "LLM application," really?

Strip away the framework vocabulary and a "Large Language Model application" is a very small thing wearing a very large coat. At its core it is:

input  →  prompt  →  model_call(prompt, params)  →  output  →  parsed_result  →  side_effects

Each arrow is a place where things go wrong, where latency hides, where cost accumulates, and where you instrument. If you understand this lifecycle you understand 90% of what an applied AI engineer ships:

  1. input-usually a user message, sometimes a system event (a webhook, a cron, a row in a queue).
  2. prompt-a list of messages assembled from templates, retrieved context, conversation history, and tool definitions.
  3. model_call-an HTTP request to a provider (Anthropic, OpenAI, Google, a self-hosted vLLM instance) with sampling parameters.
  4. output-a stream of tokens, a finished string, or a structured tool-call object.
  5. parsed_result-a Python object you can act on: a Pydantic model, a function-call payload, a markdown chunk for a UI.
  6. side_effects-logs, traces, metrics, database writes, follow-up calls, eventual user-visible bytes.

Every pattern in this chapter is a refinement of one of those six steps. Keep that diagram in your head; it is the spine.

The two pieces of the lifecycle that newcomers consistently underweight:

  • Prompt construction is code. It is a deterministic function from (state, retrieval, tools, user_input) → messages. Test it, version it, log its outputs. If you cannot reproduce a prompt from a request ID, you cannot debug your application.
  • Output parsing is code. It is a function from (model_output, schema) → typed_result_or_error. It must be total: every model output must produce either a typed result or a typed failure. No except Exception: pass.

Everything below is in service of those two halves being well-behaved.


1. The message-list abstraction

Modern chat models do not accept "a prompt." They accept a list of messages, each tagged with a role:

messages = [
    {"role": "system",    "content": "You are a triage assistant."},
    {"role": "user",      "content": "Server foo is on fire."},
    {"role": "assistant", "content": "What does the dashboard show?"},
    {"role": "user",      "content": "CPU 100% for 12 minutes."},
]

Three roles, three semantic positions:

  • system-instructions to the model about how to behave. Persona, constraints, output format, refusal policy. The model treats this as the highest-priority context. Anthropic's API splits this off into a top-level system parameter rather than mixing it into messages; OpenAI keeps it inline as the first message. Functionally identical.
  • user-what the human (or upstream system) said.
  • assistant-what the model said in previous turns. You replay these to give the model conversation memory. The model itself is stateless between API calls; you are the conversation database.

The key mental model: a chat completion is a pure function. f(messages, params) → next_assistant_message. The "conversation" is an illusion you maintain by appending the response to the list and calling again. There is no session on the server.

This has consequences:

  1. Conversation state belongs to your application. You decide what to keep, summarize, or evict. Naive append-everything strategies blow through the context window and through your budget.
  2. You can edit history. You can rewrite the user's last message before sending. You can delete a turn that went badly. The model has no notion of "what really happened."
  3. You can fabricate assistant turns. Putting {"role": "assistant", "content": "Sure, here's the JSON:"} at the end of messages is a powerful steering technique-the model continues as if it had said that. (Anthropic supports this directly via "prefilling" the assistant turn; OpenAI is more restrictive.)

Turn-taking semantics

The wire protocol expects strict alternation: system?, user, assistant, user, assistant, ..., user. A trailing assistant message is "prefill"; an absent prefill means the model produces a fresh assistant turn. Two consecutive user messages are not portable across providers. If your conversation history accidentally has [user, user] because of a UI bug, concatenate them before sending; do not pray.

Multimodal content

Content is not just a string. It is a list of blocks:

{"role": "user", "content": [
    {"type": "text", "text": "What's in this image?"},
    {"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": "..."}},
]}

Tool-use messages are also blocks (tool_use, tool_result). The string-only form is shorthand for [{"type": "text", "text": "..."}]. As soon as you do anything beyond plain chat, treat content as a list.


2. Sampling parameters, derived from first principles

The model emits a probability distribution over its vocabulary at every step. The "sampling parameters" you set on every call are knobs on how a single token is drawn from that distribution. To use them well you have to know what they actually do.

2.1 Temperature

The model produces a vector of logits z ∈ R^V over the vocabulary of size V. The probability of token i is:

p_i = exp(z_i / T) / Σ_j exp(z_j / T)

That is the softmax, with T (temperature) dividing the logits before exponentiation. Examine the limits:

  • T → 0: the largest logit dominates so completely that p_argmax → 1. This is greedy decoding. Deterministic given the same logits, but logits themselves can vary across hardware/kernels/versions, so don't promise bit-exact reproducibility.
  • T = 1: the model's "natural" distribution.
  • T → ∞: all logits divided by a huge number become ~0, so probabilities flatten to uniform over the vocabulary. Pure noise.

T = 0.7 is a folkloric default because it is the sweet spot empirically: enough variety to feel "human" and avoid loops, not enough to derail. For factual tasks, set T = 0. For creative tasks, T ∈ [0.7, 1.0]. For self-consistency voting (Section 11), you want variance, so T ∈ [0.5, 0.9].

Pitfall: "Temperature 0 = deterministic" is a half-truth. Floating-point non-associativity, batched-decoding kernels, MoE routing, and silent provider-side updates all introduce nondeterminism even at T=0. Treat T=0 as "as deterministic as we can get," not "reproducible build."

2.2 top_p (nucleus sampling)

After the softmax, sort tokens by probability descending and accumulate until the cumulative sum reaches p. Sample only from that prefix; renormalize.

sorted descending: [0.45, 0.20, 0.15, 0.08, ...]
cumulative:        [0.45, 0.65, 0.80, 0.88, ...]
top_p = 0.9 → keep first 5 or so until cumulative ≥ 0.9

top_p adapts to the shape of the distribution: in confident regions the nucleus is small (one or two tokens); in uncertain regions it is wide. This is usually what you want.

2.3 top_k

Keep only the top k logits, sample from those. k=40 is common in open-source models; many production APIs prefer top_p and either ignore or de-emphasize top_k.

Don't combine without thinking. temperature + top_p is the canonical pair. Adding top_k on top is rarely necessary and the interactions are non-obvious.

2.4 frequency_penalty / presence_penalty (OpenAI-family)

These adjust logits at sampling time based on what's already in the output:

  • frequency_penalty: subtract α · count(token) from each token's logit. Suppresses repetition of the same exact token.
  • presence_penalty: subtract α once if the token has appeared at all. Pushes toward novel vocabulary.

Use sparingly (0.10.3). Higher values produce hallucinated synonyms. Anthropic's API doesn't expose these; the model's training already mitigates degenerate loops.

2.5 max_tokens

A hard cap on output length. Always set it. It is your circuit breaker against runaway generations and runaway cost. Set it slightly above your expected output, not "as high as the model allows."

2.6 seed

OpenAI and some others accept a seed integer plus T=0 for "best-effort" reproducibility. Caveats from §2.1 apply. The response includes a system_fingerprint; if it changes, all bets are off.

2.7 stop sequences

A list of strings; generation halts when any is produced. Useful for delimited outputs ("stop at </answer>") and for early termination of streaming structured outputs.


3. Structured outputs: getting machine-readable answers

The single most common production need: you want a Python object out, not prose. There is a hierarchy of techniques, each more reliable than the last.

3.1 Why "respond in JSON" alone fails

The naive prompt:

"Reply with JSON like {\"sentiment\": \"positive|negative\"}. Output nothing else."

fails in production for predictable reasons:

  • The model wraps it in markdown: ```json\n{...}\n```.
  • The model adds a friendly preamble: "Sure! Here's the JSON: ...".
  • The model emits trailing commas, single quotes, unquoted keys, comments.
  • The model truncates at max_tokens mid-object.
  • A user input contains adversarial text that nudges the model into a chat reply.

You can defend with regexes ("extract first {...} block"), but you are now writing a JSON parser inside a regex inside a string-extraction heuristic inside an HTTP handler. Don't.

3.2 JSON mode

Most providers expose a flag (response_format={"type": "json_object"} on OpenAI, similar on others) that constrains decoding so the output is guaranteed-valid JSON. Implementation: the inference engine masks tokens that would make the partial output invalid JSON.

This solves syntactic validity. It does not solve schema validity-you can still get {"sentimnt": "ok"} (typo, wrong enum). For that you need:

3.3 Tool use / function calling for structured output

The most reliable path. You declare a "tool" whose input schema is the structure you want. The model emits a structured tool_use payload validated against that schema. You don't even have to execute a real function-the tool is just the schema-shaped exit door.

import anthropic
client = anthropic.Anthropic()

triage_tool = {
    "name": "submit_triage",
    "description": "Submit the structured triage result.",
    "input_schema": {
        "type": "object",
        "properties": {
            "severity": {"type": "string", "enum": ["sev1", "sev2", "sev3", "sev4"]},
            "service":  {"type": "string"},
            "summary":  {"type": "string", "maxLength": 200},
            "needs_human": {"type": "boolean"},
        },
        "required": ["severity", "service", "summary", "needs_human"],
    },
}

resp = client.messages.create(
    model="claude-3-7-sonnet-latest",  # use the current production model id
    max_tokens=512,
    tools=[triage_tool],
    tool_choice={"type": "tool", "name": "submit_triage"},  # forces this tool
    messages=[{"role": "user", "content": "Server foo CPU 100% for 12 minutes, customers seeing 503s."}],
)

triage_block = next(b for b in resp.content if b.type == "tool_use")
data = triage_block.input  # already a Python dict, schema-validated by the model

tool_choice forced to a specific tool means the model must emit that tool's payload. This is the cleanest "structured output" pattern in Anthropic's API. OpenAI's equivalent is tool_choice={"type": "function", "function": {"name": "submit_triage"}} plus a tools array.

3.4 Pydantic + instructor / outlines / lm-format-enforcer

For maximum ergonomics, layer a library:

from pydantic import BaseModel, Field
from typing import Literal
import instructor
from anthropic import Anthropic

class IncidentTriage(BaseModel):
    severity: Literal["sev1", "sev2", "sev3", "sev4"]
    service: str = Field(description="Affected service name")
    summary: str = Field(max_length=200)
    needs_human: bool

client = instructor.from_anthropic(Anthropic())

triage = client.messages.create(
    model="claude-3-7-sonnet-latest",
    max_tokens=512,
    response_model=IncidentTriage,           # the magic: a Pydantic class
    max_retries=2,                           # auto-retry on validation failure
    messages=[{"role": "user", "content": "Server foo CPU 100% ..."}],
)

assert isinstance(triage, IncidentTriage)
print(triage.severity, triage.service)

What instructor does under the hood:

  1. Converts the Pydantic model to a JSON Schema.
  2. Registers it as a tool (or uses JSON mode).
  3. Calls the model.
  4. Parses output into the Pydantic class.
  5. On ValidationError, re-prompts the model with the validation error and retries up to max_retries.

That last step is crucial. It turns the model into a self-correcting structured-output engine. The retry message looks like: "Your previous response failed validation: summary must be ≤ 200 chars. Please correct."

Other libraries in this space:

  • outlines-does structure-aware token sampling against a regex/JSON-Schema/CFG. Best for self-hosted inference.
  • lm-format-enforcer-similar; integrates with vLLM/transformers.
  • jsonformer-older, narrower scope.

3.5 Pydantic vs JSON Schema vs Zod-equivalent

  • Pydantic is your Python source of truth. Define schemas as Pydantic classes; auto-generate JSON Schema from them via Model.model_json_schema().
  • JSON Schema is the wire format the model accepts. Treat it as a derived artifact.
  • Zod (TS), valibot (TS), attrs (Python)-equivalent libraries. Pick one per language and stick to it; do not maintain parallel handwritten JSON Schemas alongside Pydantic models. They will drift.

3.6 Pitfalls

  • Optional fields require explicit handling. If a field can legitimately be missing, use Optional[...] = None and tell the model "omit if not applicable" in the description. Models otherwise hallucinate plausible nulls.
  • Enums beat free strings. Literal["sev1","sev2","sev3","sev4"] is far more reliable than "free string severity."
  • Descriptions matter. The Pydantic Field(description=...) is rendered into the JSON Schema and read by the model. Treat it as prompt text.
  • Don't over-nest. Flat schemas with a few fields beat deeply nested unions. Models occasionally lose track of which sub-object they're filling.

4. Tool use protocol-the deep version

Tool use (a.k.a. function calling) is the same protocol as structured outputs but with real side effects: the model decides "I need to run a tool," your code runs it, you feed the result back, the model continues.

4.1 Tool definition

A tool is three things:

{
    "name": "search_runbooks",
    "description": "Search the internal runbook corpus by free-text query. "
                   "Returns up to 5 runbook titles and URLs. "
                   "Use this when the user mentions an alert name, error code, or incident pattern.",
    "input_schema": {
        "type": "object",
        "properties": {
            "query": {"type": "string", "description": "Free-text search query."},
            "max_results": {"type": "integer", "minimum": 1, "maximum": 10, "default": 5},
        },
        "required": ["query"],
    },
}

The description is read by the model. It is the most under-invested-in surface in tool-use systems. Treat it as prompt: include when to use the tool, when not to, what it returns, what its limits are. A bad description is worse than no description because the model will use the tool wrongly.

4.2 The call/response cycle

The protocol (Anthropic flavor; OpenAI is structurally identical, syntactically different):

[user turn] -----------------------------------> model
                                                  |
[assistant turn with tool_use block] <------------+

{ id: tool_use_01, name: search_runbooks, input: {query: "503 spikes"} }

run the tool locally → get result

[user turn with tool_result block] ------------> model
                                                  |
[assistant turn with text answer] <---------------+

The minimal loop:

messages = [{"role": "user", "content": "Why is checkout returning 503s?"}]

while True:
    resp = client.messages.create(
        model=MODEL,
        max_tokens=1024,
        tools=TOOLS,
        messages=messages,
    )
    messages.append({"role": "assistant", "content": resp.content})

    if resp.stop_reason != "tool_use":
        break  # final answer

    tool_results = []
    for block in resp.content:
        if block.type == "tool_use":
            try:
                result = TOOL_DISPATCH[block.name](**block.input)
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": json.dumps(result),
                })
            except Exception as e:
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": f"Tool error: {e}",
                    "is_error": True,
                })
    messages.append({"role": "user", "content": tool_results})

print(resp.content[-1].text)

Five things to notice:

  1. You must echo the assistant's tool_use content back in the next call by appending the whole resp.content. The model uses its own previous tool-use block to thread the conversation.
  2. tool_use_id matches tool_result.tool_use_id so the model can correlate when there are multiple parallel calls.
  3. Tool results are sent as a user turn containing tool_result blocks. (OpenAI uses a separate role: "tool". The semantics are the same.)
  4. stop_reason == "tool_use" is your loop continuation condition. Other stop reasons (end_turn, max_tokens) terminate.
  5. Errors are first-class. Always set is_error=True and put a short, descriptive error in content. Don't crash the loop.

4.3 Anthropic vs OpenAI: subtle differences

  • Message structure. Anthropic: tool results in a user message with tool_result blocks. OpenAI: tool results in a separate tool role message keyed by tool_call_id.
  • System prompt placement. Anthropic: top-level system= param. OpenAI: first message with role: "system".
  • Parallel tool calls. Both support multiple tool_use/tool_call blocks in one assistant turn. OpenAI exposes parallel_tool_calls=True/False to opt out.
  • Tool choice control. Both support auto, any/required, and forcing a specific tool. Names differ slightly.
  • Streaming + tools. Both stream tool-call arguments incrementally as JSON deltas (see §6.5). Parsing partial JSON is your responsibility (or your library's).

A prudent rule: if you want portability, put your tool-loop logic behind a thin adapter and use LiteLLM (§8) so the wire-format diffs don't leak.

4.4 Multi-tool dispatch: parallel vs sequential

When the model emits multiple tool_use blocks in one turn, run them in parallel unless they have dependencies:

import asyncio

async def run_tool(block):
    fn = TOOL_DISPATCH[block.name]
    return await fn(**block.input) if asyncio.iscoroutinefunction(fn) else fn(**block.input)

tool_use_blocks = [b for b in resp.content if b.type == "tool_use"]
results = await asyncio.gather(*(run_tool(b) for b in tool_use_blocks))

Sequential dispatch when each tool depends on a previous result is the model's job to orchestrate: the model emits one tool, sees the result, then emits the next. Don't try to be clever and re-order what the model emitted.

4.5 Tool-result formatting

Return structured results when possible. JSON beats prose:

Bad:  "Found 3 runbooks. The first is..."
Good: {"results":[{"title":"503 spikes runbook","url":"..."},...]}

The model parses JSON more reliably than prose because that's what its tool-result tokens were trained against. Add a summary field if you want a human-readable hint.

If the result is large (>5–10 KB), summarize before returning. Sending 50 KB of search results back through the model burns tokens and dilutes attention.

4.6 Common failure modes

  • Hallucinated tool calls-the model invokes a tool name not in tools. Defense: validate block.name in TOOL_DISPATCH; return an is_error=True result like "Unknown tool foo. Available tools: ...".
  • Wrong-schema arguments-missing required field, wrong type. Defense: validate against the JSON Schema (Pydantic again) before executing; on failure, return error.
  • Tool-call loops-the model calls the same tool with the same args over and over. Defense: per-conversation tool-call counter; cap at e.g. 10 calls per turn; if exceeded, force a final answer.
  • Refusal to use tools-the model answers from memory instead of calling the tool. Defense: stronger description ("You MUST call search_runbooks for any incident question."), or tool_choice="any" for that turn.

5. Streaming

5.1 Why stream

Two latency numbers matter:

  • TTFT-time-to-first-token. How long until something appears.
  • Total latency-time until the response is complete.

Without streaming, the user sees nothing for the full response time (often several seconds for long outputs). With streaming, TTFT is on the order of a few hundred milliseconds and the user sees progressive output. Subjective speed improves dramatically even if total latency is unchanged.

Streaming is also necessary for:

  • Cancellation (stop generating when the user navigates away).
  • Real-time UI rendering (typewriter effect, live markdown, charts).
  • Partial parsing of structured output for live UI (see §5.5).

5.2 SSE under the hood

The wire format is Server-Sent Events (SSE): a long-lived HTTP response with Content-Type: text/event-stream, where the body is a sequence of event: and data: lines separated by blank lines. A single event looks like:

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}

Each line beginning with data: carries a JSON payload. The connection stays open until the server emits a terminal event (message_stop) or the client closes. SSE is one-way (server → client); for bidirectional you'd use WebSockets, but no major LLM API does.

5.3 Consuming a stream in Python (sync)

with client.messages.stream(
    model=MODEL,
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a haiku about pagers."}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
    final = stream.get_final_message()
print(f"\n[stop_reason={final.stop_reason}, in={final.usage.input_tokens}, out={final.usage.output_tokens}]")

text_stream yields just the text deltas. stream itself yields the structured events if you need them (block starts, tool-call deltas, usage updates).

5.4 Async generators

import anthropic
client = anthropic.AsyncAnthropic()

async def stream_to_websocket(ws, prompt: str):
    async with client.messages.stream(
        model=MODEL, max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        async for text in stream.text_stream:
            await ws.send_text(text)
            if ws.client_state.disconnected:
                await stream.close()       # cancel server-side billing of unused tokens
                return

The cancellation point matters: closing the stream signals the provider to stop generating, which usually stops you being billed for unused output tokens. If you only break out of the loop without closing, the connection may complete in the background.

5.5 Streaming + structured output: the partial-JSON problem

If the model is emitting JSON via tool use, the stream gives you JSON deltas:

{"name":
{"name": "submit_triage",
{"name": "submit_triage", "input": {"sev
{"name": "submit_triage", "input": {"severity": "sev2"
...

Naive json.loads of every delta fails for ~99% of intermediate states. Three options:

  1. Buffer until done. Simplest; gives up the streaming UX.
  2. Stream-parse with a tolerant parser. Libraries like partial-json-parser or instructor's Partial[Model] produce IncidentTriage(severity="sev2", service=None, ...) from in-progress JSON.
  3. Stream events. Provider SDKs emit input_json_delta events; concatenate their partial_json strings yourself and feed to (2).

For UI-level "fill in fields as they arrive," (2) is the standard.

5.6 Streaming pitfalls

  • Buffering proxies kill SSE. nginx, CDN layers, ALBs may buffer the whole response. Set X-Accel-Buffering: no; configure your reverse proxy to flush.
  • Heartbeats. Long pauses (the model thinks for 30s on a hard prompt) may trigger idle timeouts. Send a comment line (: ping) every 15s.
  • Error in the middle of a stream. SSE has no native error frame. Provider SDKs surface mid-stream errors as exceptions; bubble them up and tell the UI.
  • Token counting from streams. Final usage numbers arrive in the message_delta / message_stop event. Don't try to count by summing deltas.

6. Prompt caching

A specific Anthropic feature (with growing analogues elsewhere) that radically changes economics for apps with large stable prefixes.

6.1 The mechanism

You add cache_control markers to message blocks that you want cached. The first call writes the cache; subsequent calls within a TTL read from it, paying a fraction of the input-token cost for the cached portion. Indicative pricing (verify current):

  • Cache write: ~1.25× the base input price for those tokens (you pay extra to store).
  • Cache read: ~0.10× the base input price for those tokens (huge discount).
  • TTL: 5 minutes (default) or 1 hour, depending on the marker variant.

So for a 20 KB system prompt re-used across 1000 calls/day, you write the cache once every 5 min (~12/hour) and read it the rest of the time. Net cost on the cached prefix drops by ~85–90%.

6.2 Where caching wins

  • Large stable system prompts (style guides, persona, policies-kilobytes of prose).
  • Tool definitions (large tools arrays-caching them avoids re-tokenizing).
  • Stable RAG context with a long-lived document (e.g. a customer agreement re-referenced across a session).
  • Few-shot example libraries that don't change between requests.

6.3 Where caching does not help

  • Tail-of-prompt content (the user's latest question). The cache is a prefix cache: only contiguous prefixes match. The user input must come after the cache breakpoint.
  • Highly variable content. A new system prompt per user means you write but never read.
  • Short prompts. Below a few thousand tokens, the write premium isn't recouped.

6.4 Worked example

resp = client.messages.create(
    model=MODEL,
    max_tokens=512,
    system=[
        {"type": "text", "text": LONG_STYLE_GUIDE,
         "cache_control": {"type": "ephemeral"}},   # cache breakpoint
    ],
    tools=[
        {**TOOL_DEF, "cache_control": {"type": "ephemeral"}},  # cache the tool block too
    ],
    messages=[
        {"role": "user", "content": user_question},   # NOT cached-the tail
    ],
)
print(resp.usage)
# CacheCreationInputTokens / CacheReadInputTokens / InputTokens / OutputTokens

The usage block reports the split: how many tokens were cache-writes, how many were cache-reads, how many were uncached input. Your cost dashboards must consume those four numbers, not just one "input tokens."

6.5 Cache invalidation discipline

The cache key is essentially the byte content of the prefix. Any change-even whitespace-invalidates. Therefore:

  • Pin your prefix. Don't include timestamps, request IDs, or random ordering of tools in the cached portion.
  • Order matters. Sort tool arrays deterministically.
  • Stable serialization. If you JSON-encode something inside the prefix, use sorted keys.

The TTL refreshes on each read; a hot prompt stays warm.

6.6 Estimating savings

calls_per_day      = 1000
cached_tokens      = 5000
uncached_tokens    = 200
output_tokens      = 300

# illustrative prices, USD per 1M tokens (verify current):
P_in   = 3.00
P_out  = 15.00
P_w    = P_in * 1.25
P_r    = P_in * 0.10

writes_per_day = (24 * 60) // 5   # if TTL is 5 min and prompt always warm-able
no_cache_cost = calls_per_day * (cached_tokens + uncached_tokens) * P_in / 1e6 \
              + calls_per_day * output_tokens * P_out / 1e6
cache_cost    = writes_per_day * cached_tokens * P_w / 1e6 \
              + (calls_per_day - writes_per_day) * cached_tokens * P_r / 1e6 \
              + calls_per_day * uncached_tokens * P_in / 1e6 \
              + calls_per_day * output_tokens   * P_out / 1e6
print(f"savings: {(1 - cache_cost / no_cache_cost) * 100:.1f}%")

7. Provider abstraction: LiteLLM and friends

You will work with multiple providers-to compare, to fail over, to use the right model for the job. Two strategies:

7.1 LiteLLM

LiteLLM exposes one OpenAI-shaped interface across 100+ providers. Use it when:

  • You want portability across providers (Anthropic, OpenAI, Bedrock, Vertex, Azure, Cohere, Together, Groq, self-hosted) without rewriting.
  • You want a single billing/observability layer (LiteLLM Proxy: a self-hosted gateway that adds auth, rate-limiting, cost tracking, fallback policies).
  • Your features use only the lowest-common-denominator API surface.
from litellm import completion, acompletion

resp = completion(
    model="anthropic/claude-3-7-sonnet-latest",
    messages=[{"role": "user", "content": "Hi"}],
)
# Same call, same response shape, swap the model id:
resp = completion(model="openai/gpt-4o-mini", messages=[...])

The LiteLLM Proxy is the higher-leverage form: deploy it as a service, your apps speak OpenAI to it, the proxy handles routing, fallbacks, and key management. Entire teams operate this way.

7.2 Native SDKs

Use anthropic / openai / google-generativeai directly when:

  • You need bleeding-edge features (a new tool-use mode, prompt caching, batch API, file inputs) that LiteLLM hasn't lifted yet.
  • You need the strongest typing (LiteLLM is OpenAI-shaped; non-OpenAI features are awkward).
  • Your hot path benefits from one less hop / one less serialization round-trip.

A common architecture: native SDKs for the core call site (best types, latest features), with a thin in-house "provider router" that knows when to switch. Use LiteLLM Proxy only if you need a gateway (auth, multi-tenant policy enforcement) more than a library.

7.3 Failover patterns

Primary/secondary with a circuit breaker (deeper version in §10):

class FailoverClient:
    def __init__(self, primary, secondary):
        self.primary = primary
        self.secondary = secondary
        self.primary_breaker = CircuitBreaker(threshold=5, reset_seconds=30)

    async def complete(self, **kwargs):
        if self.primary_breaker.closed:
            try:
                return await self.primary.complete(**kwargs)
            except (RateLimitError, ServiceUnavailableError) as e:
                self.primary_breaker.record_failure()
                logger.warning("primary failed: %s; falling over", e)
        return await self.secondary.complete(**kwargs)

Note the failover is on infrastructure errors (429, 5xx), not on content errors (the model said something you didn't like). The latter is not a failure; running the same prompt against a different model won't fix a logic bug.


8. Cost calculation: making it visible

LLM cost is a step function of decisions you make at request construction. You cannot improve it if you don't measure it.

8.1 Token counting

Tokens are model-specific. There is no universal tokenizer.

  • OpenAI: tiktoken library, encoder per model (e.g. o200k_base for GPT-4o family).
    import tiktoken
    enc = tiktoken.encoding_for_model("gpt-4o")
    n = len(enc.encode("the quick brown fox"))
    
  • Anthropic: a client.messages.count_tokens(...) API endpoint. Counting client-side requires their published tokenizer for the model family.
  • Open models (Llama, Mistral, Qwen): transformers.AutoTokenizer.from_pretrained(model_id).

A character-based approximation (tokens ≈ chars / 4 for English) is fine for back-of-envelope but not for billing. For invoiced/charged-back costs, count exactly.

8.2 Per-call cost

cost_usd = (input_tokens   * price_in_per_1m  / 1_000_000)
         + (output_tokens  * price_out_per_1m / 1_000_000)

With prompt caching:

cost_usd = (cache_write_tokens * price_in_per_1m * 1.25 / 1e6)
         + (cache_read_tokens  * price_in_per_1m * 0.10 / 1e6)
         + (uncached_in_tokens * price_in_per_1m         / 1e6)
         + (output_tokens      * price_out_per_1m        / 1e6)

Persist all four token counts per call. If you only log "input + output" you cannot tell whether caching is working.

8.3 Per-feature, per-tenant cost

cost_per_feature = cost_per_call * calls_per_feature_invocation
cost_per_user    = cost_per_feature * features_invoked_per_session * sessions_per_month

Tag every call with (feature_id, tenant_id, user_id, request_id). Aggregate in your warehouse. The dashboards that matter:

  • Cost per conversation (p50, p95, p99). The p99 is where surprises live.
  • Cost per tenant-for multi-tenant SaaS, the basis of pricing.
  • Hot prompts-top 10 prompts by total spend. Almost always one of them is recoverable via caching or prompt slimming.
  • Tokens-per-tool-call distribution-fat tool results burn input tokens on the next call. Catch them.

8.4 The cost ledger pattern

Wrap every model call with a logging/metrics emitter:

async def tracked_call(provider_call, *, feature, tenant, **kwargs):
    t0 = time.monotonic()
    resp = await provider_call(**kwargs)
    dt = time.monotonic() - t0
    in_tok  = resp.usage.input_tokens
    out_tok = resp.usage.output_tokens
    cost = (in_tok * PRICE[kwargs["model"]]["in"] + out_tok * PRICE[kwargs["model"]]["out"]) / 1e6
    metrics.observe("llm.latency_s", dt, tags={"feature": feature, "model": kwargs["model"]})
    metrics.increment("llm.cost_usd", cost, tags={"feature": feature, "tenant": tenant})
    cost_ledger.write({
        "ts": time.time(), "feature": feature, "tenant": tenant,
        "model": kwargs["model"], "in": in_tok, "out": out_tok,
        "cost_usd": cost, "latency_s": dt,
    })
    return resp

Treat the ledger as a first-class table, not a log. You will join it to product analytics.


9. Retry, backoff, rate limits

LLM APIs fail, throttle, and degrade. A production client retries the right things and gives up on the rest.

9.1 Status code taxonomy

  • 429 Too Many Requests / RateLimitError-you exceeded RPM, TPM, or concurrent-request limits. Retry with backoff.
  • 500 / 502 / 503 / 504-provider-side. Retry a few times with backoff.
  • 408 / 504 / read timeout-network hiccup. Retry, but with an idempotency safeguard (see §15).
  • 400 / 422 / context-length-exceeded-your bug. Do not retry. Fix the request.
  • 401 / 403-auth. Don't retry; alert.
  • content_policy / safety errors-model refused / content was filtered. Don't retry the same prompt; rewrite or surface to user.

9.2 Exponential backoff with jitter

The textbook formula:

delay_seconds = min(cap, base * 2 ** attempt) + random.uniform(0, jitter)
  • base = 1.0, cap = 60.0, jitter = base is a sensible default.
  • The jitter is critical: without it, every client retries in lockstep and the thundering herd takes down the recovering service.
  • Variants: "full jitter" (random.uniform(0, base * 2 ** attempt)) is even better at avoiding bursts.
import random, asyncio

async def with_backoff(fn, *, retries=5, base=1.0, cap=60.0):
    for attempt in range(retries):
        try:
            return await fn()
        except (RateLimitError, APITimeoutError, InternalServerError) as e:
            if attempt == retries - 1:
                raise
            delay = min(cap, base * (2 ** attempt)) + random.uniform(0, base)
            logger.warning("attempt %d failed (%s); sleeping %.2fs", attempt + 1, e, delay)
            await asyncio.sleep(delay)

9.3 Honor Retry-After

If the response carries a Retry-After header, use that delay, not your formula. The provider knows when its quota window resets.

9.4 Tenacity / backoff libraries

Don't hand-roll if you don't have to:

from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type

@retry(
    retry=retry_if_exception_type((RateLimitError, APITimeoutError, InternalServerError)),
    wait=wait_exponential_jitter(initial=1, max=60),
    stop=stop_after_attempt(5),
    reraise=True,
)
async def call_model(**kwargs):
    return await client.messages.create(**kwargs)

9.5 Circuit breakers

If the provider is down, retries waste latency for every user. After N consecutive failures, open the breaker: fail fast for some cool-down period, then half-open (let one probe through), then close if the probe succeeds.

from enum import Enum
import time

class State(Enum):
    CLOSED = "closed"; OPEN = "open"; HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, threshold=5, reset_seconds=30):
        self.threshold = threshold
        self.reset_seconds = reset_seconds
        self.failures = 0
        self.state = State.CLOSED
        self.opened_at = 0.0

    def before_call(self):
        if self.state is State.OPEN:
            if time.monotonic() - self.opened_at >= self.reset_seconds:
                self.state = State.HALF_OPEN
            else:
                raise CircuitOpenError("breaker open")

    def record_success(self):
        self.failures = 0
        self.state = State.CLOSED

    def record_failure(self):
        self.failures += 1
        if self.failures >= self.threshold:
            self.state = State.OPEN
            self.opened_at = time.monotonic()

Wrap the provider call:

async def safe_call(**kwargs):
    breaker.before_call()
    try:
        resp = await call_model(**kwargs)
    except (InternalServerError, APITimeoutError):
        breaker.record_failure()
        raise
    breaker.record_success()
    return resp

Combine with failover (§7.3): primary's breaker open → use secondary.


10. Multi-call orchestration patterns

Real applications make multiple LLM calls per user request. The shapes are limited; learn them once.

10.1 Sequential chain

extract → classify → respond

Each step is a separate prompt; the output of step N is input to step N+1. Use when steps are heterogeneous (different models, different system prompts, different tools).

async def triage_pipeline(report: str) -> str:
    extracted  = await extract(report)         # entities + facts
    classified = await classify(extracted)     # severity + service
    answer     = await respond(report, extracted, classified)
    return answer

Latency is the sum; cost is the sum. Don't chain when one prompt would do.

10.2 Map-reduce

Process N independent chunks in parallel, then combine.

async def summarize_long_doc(chunks: list[str]) -> str:
    partials = await asyncio.gather(*(summarize(c) for c in chunks))
    return await combine(partials)

Use for: long-document summarization, bulk classification, multi-source RAG, parallel evals.

10.3 Branch-then-merge

Classify first; route to specialized prompts; merge results.

async def respond(query: str) -> str:
    intent = await classify_intent(query)
    if intent == "billing":
        return await billing_agent(query)
    elif intent == "technical":
        return await technical_agent(query)
    else:
        return await general_agent(query)

The router is usually a small/fast/cheap model; the specialists may be larger.

10.4 Self-consistency

Same prompt N times at non-zero temperature; majority-vote the structured answer. Reduces variance on hard reasoning tasks.

from collections import Counter

async def self_consistent_triage(report: str, n: int = 5) -> IncidentTriage:
    samples = await asyncio.gather(*(triage(report, temperature=0.7) for _ in range(n)))
    severities = Counter(s.severity for s in samples)
    chosen, _ = severities.most_common(1)[0]
    # pick a representative sample at the winning severity
    return next(s for s in samples if s.severity == chosen)

Costs N× a single call. Use for high-stakes classifications, not for chat.

10.5 Self-critique

Ask the model to critique its own draft, then revise.

draft = generate(prompt)
critique = critique(prompt, draft)
final = revise(prompt, draft, critique)

Empirically helps for structured tasks (code, plans, JSON correctness) more than for prose. Costs ~3× a single call.

10.6 Async parallel calls

The general workhorse:

results = await asyncio.gather(*(call(x) for x in inputs), return_exceptions=True)
for r in results:
    if isinstance(r, Exception):
        ...  # bounded fault tolerance

A semaphore prevents your own client from triggering provider rate limits:

sem = asyncio.Semaphore(10)
async def bounded(x):
    async with sem:
        return await call(x)
results = await asyncio.gather(*(bounded(x) for x in inputs))

11. Few-shot prompting (still useful in 2026)

Reasoning models lessened the need for elaborate few-shot, but for structured tasks with idiosyncratic conventions few-shot remains the highest-leverage prompt technique.

11.1 Structure

system: <role + format spec>
user:   <example 1 input>
assistant: <example 1 output>
user:   <example 2 input>
assistant: <example 2 output>
...
user:   <real input>

The model treats the trail of user/assistant pairs as "this is what good looks like."

11.2 Count and ordering

  • 3–5 examples is the sweet spot for most tasks; more rarely helps and burns tokens.
  • Diversity beats quantity. Cover the failure modes you've seen.
  • Recency bias is real-the last example influences the model most. Put your single best, most-on-task example last.
  • Class balance-for classification with N classes, include at least one of each.

11.3 When few-shot beats zero-shot

  • Domain-specific output formats ("this team writes incident summaries in this exact style").
  • Edge cases the model otherwise gets wrong.
  • New / proprietary nomenclature ("a widget-frob is one of our internal entities; here's how to extract it").

11.4 Dynamic few-shot

Pre-compute embeddings for a library of examples; at request time, retrieve the K most similar examples to the user input and inject them. Combines few-shot with RAG.

async def respond_with_dyn_fewshot(query: str) -> str:
    examples = retrieve_similar_examples(query, k=4)
    messages = [{"role": "system", "content": SYSTEM}]
    for ex in examples:
        messages.append({"role": "user", "content": ex.input})
        messages.append({"role": "assistant", "content": ex.output})
    messages.append({"role": "user", "content": query})
    return await complete(messages)

Cache the embeddings of examples; refresh when the library changes.


12. Chain-of-thought and reasoning models

12.1 Classic CoT

The 2022-era trick: append "Let's think step by step" or include reasoning chains in few-shot examples. The model produces visible intermediate steps, often improving accuracy on math/logic problems.

When it helps:

  • Multi-step arithmetic.
  • Logical deduction with multiple constraints.
  • Tasks where the answer's correctness depends on a reasoning chain you can't write down a priori.

When it doesn't help:

  • Already-trivial tasks (you just paid for tokens).
  • Tasks where the model is wrong at step 1 (CoT amplifies errors as confidently as correct answers).
  • Pure recall tasks ("what's the capital of France").

12.2 Reasoning models (o1, Claude 3.7 thinking, R1)

Starting in 2024 and dominating by 2026, providers ship dedicated reasoning models that internally generate long chains of thought before producing a visible answer. Implications:

  • Different cost shape. "Thinking tokens" are billed (often at output rates). A single call can produce thousands of hidden tokens.
  • Different latency shape. TTFT may be tens of seconds while the model thinks; visible output is fast once it starts.
  • Less need for explicit "think step by step" prompting. The model already does it.
  • Often controllable. Anthropic exposes a thinking budget; OpenAI exposes a reasoning.effort knob (low/medium/high).
resp = client.messages.create(
    model="claude-3-7-sonnet-latest",
    max_tokens=4096,
    thinking={"type": "enabled", "budget_tokens": 8000},   # generic shape; verify current
    messages=[...],
)

Decision rule: use reasoning models for tasks where the answer's correctness, not the answer's style, dominates value. Code generation, math, plan synthesis, multi-constraint scheduling. Don't pay reasoning premiums for chat.

12.3 When to ask for explicit reasoning vs trust internal CoT

  • For non-reasoning models: explicit CoT in the prompt or via "think before answering" still helps measurably on hard tasks.
  • For reasoning models: prefer trusting the internal CoT; double-CoT (asking a reasoning model to also "think step by step") rarely helps and can confuse output formatting.
  • Always-keep the explicit CoT out of the user-facing answer unless the user wants it. Use a tool-use exit (§3.3) to constrain the visible output to the structured answer.

13. DSPy-programs over prompts

DSPy reframes prompt engineering: instead of writing prompts, you declare signatures (input/output specs) and modules (call patterns), and DSPy compiles them into prompts and optimizes them against eval data.

import dspy

class Triage(dspy.Signature):
    """Triage an incident report into severity + summary."""
    report: str = dspy.InputField()
    severity: str = dspy.OutputField(desc="sev1|sev2|sev3|sev4")
    summary:  str = dspy.OutputField(desc="<= 200 chars")

triage = dspy.ChainOfThought(Triage)
result = triage(report="server foo CPU 100%...")

DSPy's compiler (dspy.Optimize / teleprompters) tunes few-shot examples and prompt phrasings against a metric you define on a labeled dataset.

When DSPy makes sense:

  • Composable pipelines with multiple LLM steps.
  • You have (or can collect) eval data and a clear metric.
  • You want to swap models without rewriting prompts.

When DSPy is overhead:

  • One-off scripts, prototypes.
  • You're already happy with hand-tuned prompts.
  • The team isn't ready to maintain a separate "compiled prompts" artifact.

Treat DSPy as one option in the toolbox, not a religion.


14. Production patterns

14.1 Per-request idempotency keys

Network blips cause duplicate requests. If your model call has side effects (writes to DB, sends an email), an idempotency key prevents double-execution:

key = sha256(f"{user_id}:{conversation_id}:{message_seq}".encode()).hexdigest()
if redis.set(f"idem:{key}", "1", nx=True, ex=600):
    result = await call_model(...)
    redis.set(f"idem:result:{key}", json.dumps(result), ex=600)
else:
    result = json.loads(redis.get(f"idem:result:{key}"))

Some providers accept a client-supplied idempotency header that shortcuts this on their side; check current docs.

14.2 Multi-tenant isolation

  • Per-tenant API keys for the upstream provider when you need usage segregation, billing, or compliance separation.
  • Per-tenant rate limits in your own gateway (token-bucket per tenant).
  • Per-tenant cost ceilings that circuit-break the tenant before they bankrupt you. A free-tier tenant should never be able to cost you $1000/day.
  • Per-tenant model allowlists-restrict which models a tenant can route to.

14.3 PII redaction at the prompt boundary

Two layers:

  • Inbound (user → model): scrub or tokenize sensitive fields. If the user pastes a credit card number, you may want to replace it with <CARD_4242> before sending to the provider.
  • Outbound (model → user): if your model has access to internal data, scan the output for accidental leakage of sensitive fields.

Libraries: presidio (Microsoft), regex packs for common formats. Crucially, log the redacted prompt in your traces; never put raw PII in observability stores.

14.4 Observability (briefly)

Every call gets a span with attributes: model, feature, tenant, prompt_hash, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, latency_ms, cost_usd, stop_reason, tool_calls, error_type. These let you answer questions like "which feature is hottest by p95 latency this week" without grep.

Tools: OpenTelemetry traces; LangSmith / Langfuse / Helicone if you want LLM-specific observability out of the box. The principle is more important than the tool: every call is a span, every span has standard attributes, and you have a dashboard.

14.5 Caching beyond prompt cache

  • Response caching by (prompt_hash, params_hash)-for deterministic prompts (T=0), cache the full response. A KV with TTL gives instant replay for identical requests, often 100% latency reduction.
  • Embeddings cache-embeddings are deterministic. Always cache them keyed by (model, text_hash).
  • Tool-result cache-if the tool is a deterministic lookup (get_user(user_id)), cache its result for the duration of the conversation.

14.6 Safety filters

Full treatment is a separate chapter. The minimum:

  • Pre-filter for prompt-injection patterns in untrusted user content (especially tool outputs that include user-controlled text).
  • Post-filter for policy violations on your output.
  • Use the provider's own moderation API where available.

15. The "minimum viable LLM service" reference architecture

A self-contained skeleton that ties everything together. ~80 lines of Python.

import asyncio, json, time, hashlib, os, logging, random
from typing import Literal
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from anthropic import AsyncAnthropic, RateLimitError, APITimeoutError, InternalServerError

log = logging.getLogger("llm-service")
client = AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
MODEL = "claude-3-7-sonnet-latest"

PRICE = {MODEL: {"in": 3.00, "out": 15.00}}  # USD per 1M tokens; verify current

# ---------- contracts ---------------------------------------------------------

class TriageRequest(BaseModel):
    tenant_id: str
    user_id: str
    report: str = Field(min_length=1, max_length=20_000)

class IncidentTriage(BaseModel):
    severity: Literal["sev1", "sev2", "sev3", "sev4"]
    service: str
    summary: str = Field(max_length=200)
    needs_human: bool

# ---------- prompt construction ----------------------------------------------

SYSTEM = (
    "You are an incident triage assistant. Always call submit_triage with the structured result."
)

TRIAGE_TOOL = {
    "name": "submit_triage",
    "description": "Submit the structured triage. Always invoke this; never reply in prose.",
    "input_schema": IncidentTriage.model_json_schema(),
}

def build_messages(report: str) -> list[dict]:
    return [{"role": "user", "content": report}]

# ---------- retry wrapper -----------------------------------------------------

RETRYABLE = (RateLimitError, APITimeoutError, InternalServerError)

async def with_backoff(fn, *, retries=4, base=1.0, cap=30.0):
    for attempt in range(retries):
        try:
            return await fn()
        except RETRYABLE as e:
            if attempt == retries - 1:
                raise
            delay = min(cap, base * (2 ** attempt)) + random.uniform(0, base)
            log.warning("retry %d after %s in %.2fs", attempt + 1, type(e).__name__, delay)
            await asyncio.sleep(delay)

# ---------- the call ----------------------------------------------------------

async def call_triage(report: str) -> tuple[IncidentTriage, dict]:
    async def go():
        return await client.messages.create(
            model=MODEL,
            max_tokens=512,
            system=[{"type": "text", "text": SYSTEM, "cache_control": {"type": "ephemeral"}}],
            tools=[{**TRIAGE_TOOL, "cache_control": {"type": "ephemeral"}}],
            tool_choice={"type": "tool", "name": "submit_triage"},
            messages=build_messages(report),
        )
    resp = await with_backoff(go)
    block = next((b for b in resp.content if b.type == "tool_use"), None)
    if block is None:
        raise HTTPException(502, "model did not invoke tool")
    parsed = IncidentTriage.model_validate(block.input)  # raises on schema violation
    usage = {
        "in":  resp.usage.input_tokens,
        "out": resp.usage.output_tokens,
        "cache_read":  getattr(resp.usage, "cache_read_input_tokens", 0),
        "cache_write": getattr(resp.usage, "cache_creation_input_tokens", 0),
    }
    return parsed, usage

# ---------- HTTP layer --------------------------------------------------------

app = FastAPI()

@app.post("/triage", response_model=IncidentTriage)
async def triage(req: TriageRequest):
    rid = hashlib.sha256(f"{req.tenant_id}:{req.user_id}:{req.report}".encode()).hexdigest()[:12]
    t0 = time.monotonic()
    try:
        result, usage = await call_triage(req.report)
    except RETRYABLE as e:
        log.exception("rid=%s upstream failure", rid)
        raise HTTPException(503, f"upstream error: {type(e).__name__}")
    dt = time.monotonic() - t0
    cost = (usage["in"] * PRICE[MODEL]["in"] + usage["out"] * PRICE[MODEL]["out"]) / 1e6
    log.info(
        "rid=%s tenant=%s feature=triage in=%d out=%d cache_r=%d cache_w=%d cost=$%.6f t=%.2fs",
        rid, req.tenant_id, usage["in"], usage["out"], usage["cache_read"], usage["cache_write"], cost, dt,
    )
    return result

What it has and what it deliberately omits:

  • Has: typed request/response, prompt caching on system + tools, forced-tool structured output, schema validation, exponential backoff with jitter, cost logging, tenant tagging, request id.
  • Omits: persistent conversation state (this is one-shot), streaming (add for chat UIs), tracing (add OTEL), idempotency (add Redis), tenant rate limiting (add a token bucket), PII redaction (add presidio at boundary), failover (add LiteLLM router), circuit breaker (wrap client.messages.create).

Each omission is one chapter section; you've read them.


16. Practical exercises

These are calibrated for a working backend/SRE engineer transitioning to applied AI. Each takes 30–90 minutes; do them in order. Do not skip the math in #5; it is where the chapter's economics become real.

Exercise 1-Prompt caching with savings estimate

Take a 5-message system prompt (5 KB of style guide + policy + examples). Wire Anthropic prompt caching with cache_control on the last message of the system. Run 50 calls; verify cache_read_input_tokens > 0 after the first warm-up. Then compute, on paper:

  • At 1000 calls/day, what is the daily cost without caching?
  • With caching (assume 5-min TTL → 12 writes/hour → ~288 writes/day; rest are reads)?
  • Express savings as a percent.

Acceptance: your computed savings ≥ 80% on the cached portion. Sanity-check against the actual measured usage numbers from your test run.

Exercise 2-Pydantic-validated tool-use loop with retry-on-validation-fail

Define class IncidentTriage(BaseModel) with at least one field whose validity the model occasionally violates (e.g. summary: str = Field(max_length=80) is tight enough that the model will overrun). Build a loop that:

  1. Calls the model with the tool.
  2. Tries IncidentTriage.model_validate(block.input).
  3. On ValidationError, appends the assistant's tool_use content + a synthetic tool_result block of {"is_error": True, "content": "<the validation error message>. Please correct."} and re-calls.
  4. Caps at 3 attempts; raises after.

Acceptance: produces a valid IncidentTriage even when you craft an input that initially provokes overrun (e.g. a multi-paragraph report). Show the retry messages in logs.

Exercise 3-Sequential pipeline → async map-reduce

Start with a 3-step pipeline that summarizes a long document by serially summarizing chunks and concatenating. Convert to:

  • Split into N chunks.
  • asyncio.gather per-chunk summaries.
  • One final "combine" call that takes all partial summaries and produces a coherent whole.

Acceptance: total wall-clock time on a 10-chunk doc drops from ~10× single-call latency to ~2× (one parallel batch + one combine). Bound concurrency with a semaphore (e.g. 5).

Exercise 4-Circuit breaker around the OpenAI client

Implement CircuitBreaker with the three states (closed/open/half-open). Wrap openai.AsyncClient.chat.completions.create. Configuration: open after 5 consecutive failures (count only retryable errors); cool-down 30s; one probe call in half-open; close on success.

Test by injecting a fault: monkey-patch the client to raise InternalServerError 6 times, then succeed. Assert the breaker opens after 5, fails fast on call 6 (no upstream call), waits 30s, half-opens on call 7, closes on success.

Acceptance: a unit test that asserts the state transitions and that no upstream call is made while the breaker is open.

Exercise 5-Tokenize a 10-message conversation; compute Anthropic cost

Take a 10-message conversation (5 user, 5 assistant). For each message, count tokens via client.messages.count_tokens(...) (Anthropic) or its current equivalent. Write a script that prints:

  • Total input tokens (the conversation as a prompt).
  • Estimated output tokens (use 200 as a placeholder).
  • Cost on Sonnet pricing (use illustrative prices; mark "as of ~2025; verify current"):
    input  = 3.00 $/1M
    output = 15.00 $/1M
    
  • The cost contribution of each message (tokens_i / total_tokens × input_cost). The point is to see which messages dominate.

Acceptance: a single Python script you can re-run on any conversation file. Bonus: include cache-read pricing if the system prompt is cached.

Exercise 6-Self-consistency pipeline

Build:

async def self_consistent_triage(report: str, n: int = 5) -> IncidentTriage:
    samples = await asyncio.gather(*(triage(report, temperature=0.7) for _ in range(n)))
    # mode of (severity, service, needs_human) tuple, summary from a winning sample

Run it on 20 ambiguous reports. Compare to single-call T=0 on the same reports against ground-truth labels. Compute accuracy delta and cost multiplier.

Acceptance: a printed table with accuracy_single, accuracy_self_consistent, cost_multiplier. Discuss in a paragraph: at what accuracy delta does the 5× cost pay for itself for a "high-stakes incident classification" feature?


17. Closing-the engineer's checklist

When you ship an LLM-powered feature, walk this list before merging:

  • Prompt is constructed by a pure function from (state, retrieval, tools, input); logged with a hash per request.
  • Output is parsed by a pure function with a typed failure path; never except Exception: pass.
  • Sampling parameters are explicit (temperature, max_tokens, top_p); not defaults inherited from the SDK.
  • Structured outputs use tool use or JSON mode + Pydantic validation, not prose parsing.
  • Tool definitions have descriptions written as prompts, with usage guidance.
  • Tool-call loop has a hard cap (e.g. 10 calls/turn) and validates tool names before dispatch.
  • Streaming is on for any user-facing chat surface; cancellation closes the stream.
  • Prompt caching is enabled on stable prefixes ≥ 1024 tokens; you've measured savings.
  • Provider call is wrapped in retry-with-jitter; non-retryable 4xx are surfaced cleanly.
  • A circuit breaker fails fast when the provider is down.
  • Cost ledger logs (in, out, cache_read, cache_write, cost_usd, feature, tenant, request_id) for every call.
  • Per-tenant rate limits and cost ceilings exist.
  • PII redaction at both inbound and outbound boundaries.
  • An eval set with ≥ 50 labeled examples exists; CI runs it on prompt changes.

If you can tick all of these, you have an LLM application that won't surprise you at 3 AM. The patterns in this chapter are the means; the checklist is the end.


End of Deep Dive 05. Continue with Deep Dive 06-RAG and Retrieval Patterns, which builds on §3 (structured outputs) and §10 (orchestration) for retrieval-grounded generation.

Comments