Streaming Skill
Implement real-time streaming responses from Claude API using Server-Sent Events (SSE).
When to Use This Skill
-
Real-time user interfaces
-
Long-running generations
-
Progressive output display
-
Tool use with streaming
-
Extended thinking visualization
SSE Event Flow
message_start → content_block_start → content_block_delta (repeated) → content_block_stop → (more blocks...) → message_delta → message_stop
Core Implementation
Basic Text Streaming (Python)
import anthropic
client = anthropic.Anthropic()
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "Write a short story."}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)
Event-Based Streaming
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "Hello"}] ) as stream: for event in stream: if event.type == "content_block_delta": if event.delta.type == "text_delta": print(event.delta.text, end="") elif event.delta.type == "input_json_delta": # Tool input (accumulate, don't parse yet!) tool_input_buffer += event.delta.partial_json elif event.type == "content_block_stop": # Now safe to parse tool input if tool_input_buffer: tool_input = json.loads(tool_input_buffer)
TypeScript Streaming
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
const stream = client.messages.stream({ model: 'claude-sonnet-4-20250514', max_tokens: 1024, messages: [{ role: 'user', content: 'Write a story.' }] });
for await (const event of stream) { if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') { process.stdout.write(event.delta.text); } }
const finalMessage = await stream.finalMessage();
Event Types Reference
Event When Data
message_start
Beginning Message metadata
content_block_start
Block begins Block type, index
content_block_delta
Content chunk Delta content
content_block_stop
Block ends
message_delta
Message update Stop reason, usage
message_stop
Complete
Delta Types
Delta Type Content When
text_delta
.text
Text content
input_json_delta
.partial_json
Tool input
thinking_delta
.thinking
Extended thinking
signature_delta
.signature
Thinking signature
Tool Use Streaming
Critical Rule: Never Parse JSON Mid-Stream!
WRONG - Will fail on partial JSON!
for event in stream: if event.delta.type == "input_json_delta": tool_input = json.loads(event.delta.partial_json) # FAILS!
CORRECT - Accumulate then parse
tool_json_buffer = "" for event in stream: if event.delta.type == "input_json_delta": tool_json_buffer += event.delta.partial_json elif event.type == "content_block_stop": if tool_json_buffer: tool_input = json.loads(tool_json_buffer) # Safe now! tool_json_buffer = ""
Complete Tool Streaming Pattern
def stream_with_tools(client, messages, tools): current_block = None tool_input_buffer = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=messages,
tools=tools
) as stream:
for event in stream:
if event.type == "content_block_start":
current_block = event.content_block
tool_input_buffer = ""
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
yield {"type": "text", "content": event.delta.text}
elif event.delta.type == "input_json_delta":
tool_input_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
if current_block.type == "tool_use":
yield {
"type": "tool_call",
"id": current_block.id,
"name": current_block.name,
"input": json.loads(tool_input_buffer)
}
Extended Thinking Streaming
thinking_content = "" signature = ""
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=16000, thinking={"type": "enabled", "budget_tokens": 10000}, messages=[{"role": "user", "content": "Solve this complex problem..."}] ) as stream: for event in stream: if event.type == "content_block_delta": if event.delta.type == "thinking_delta": thinking_content += event.delta.thinking # Optionally display thinking in UI elif event.delta.type == "signature_delta": signature = event.delta.signature elif event.delta.type == "text_delta": print(event.delta.text, end="")
Error Handling
Retriable Errors
import time
RETRIABLE_ERRORS = [529, 429, 500, 502, 503]
def stream_with_retry(client, **kwargs): max_retries = 3 base_delay = 1
for attempt in range(max_retries):
try:
with client.messages.stream(**kwargs) as stream:
for event in stream:
yield event
return
except anthropic.APIStatusError as e:
if e.status_code in RETRIABLE_ERRORS and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
else:
raise
Silent Overloaded Errors
CRITICAL: Check for error events even at HTTP 200!
for event in stream: if event.type == "error": if event.error.type == "overloaded_error": # Retry with backoff pass
Connection Management
Keep-Alive Configuration
import httpx
Proper timeout configuration
http_client = httpx.Client( timeout=httpx.Timeout( connect=10.0, # Connection timeout read=120.0, # Read timeout (long for streaming!) write=30.0, # Write timeout pool=30.0 # Pool timeout ) )
client = anthropic.Anthropic(http_client=http_client)
Connection Pooling
http_client = httpx.Client( limits=httpx.Limits( max_keepalive_connections=20, max_connections=100, keepalive_expiry=30.0 ) )
Best Practices
DO:
-
Set read timeout >= 60 seconds
-
Accumulate tool JSON, parse after block_stop
-
Handle error events even at HTTP 200
-
Use exponential backoff for retries
DON'T:
-
Parse partial JSON during streaming
-
Use short timeouts
-
Ignore overloaded_error events
-
Leave connections idle >5 minutes
UI Integration Pattern
async def stream_to_ui(websocket, prompt): """Stream Claude response to WebSocket client""" async with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=4096, messages=[{"role": "user", "content": prompt}] ) as stream: async for text in stream.text_stream: await websocket.send_json({ "type": "chunk", "content": text })
await websocket.send_json({
"type": "complete",
"usage": stream.get_final_message().usage
})
See Also
-
[[llm-integration]] - API basics
-
[[tool-use]] - Tool calling
-
[[extended-thinking]] - Deep reasoning