Batch Processing Skill
Leverage Anthropic's Message Batches API for 50% cost savings on bulk processing workloads.
When to Use This Skill
-
Processing 10K+ documents
-
Model evaluation and benchmarks
-
ETL and data enrichment
-
Training data generation
-
Bulk content analysis
-
Any non-time-sensitive workload
Cost Savings
Processing Type Cost
Standard API 100%
Batch API 50%
Example: 1M tokens @ $3/1M = $3 (standard) vs $1.50 (batch)
Batch Lifecycle
Created → Processing → Ended (completed/failed/expired/canceled)
-
Processing time: Up to 24 hours
-
Results retention: 29 days
-
Full feature support: Tools, vision, system prompts
Core Implementation
Step 1: Create JSONL File
import json
Each line is a complete request
requests = [ { "custom_id": "request-1", "params": { "model": "claude-sonnet-4-20250514", "max_tokens": 1024, "messages": [{"role": "user", "content": "Summarize: Document 1..."}] } }, { "custom_id": "request-2", "params": { "model": "claude-sonnet-4-20250514", "max_tokens": 1024, "messages": [{"role": "user", "content": "Summarize: Document 2..."}] } } ]
Write JSONL file
with open("batch_requests.jsonl", "w") as f: for request in requests: f.write(json.dumps(request) + "\n")
Step 2: Submit Batch
import anthropic
client = anthropic.Anthropic()
Create batch from file
with open("batch_requests.jsonl", "rb") as f: batch = client.beta.messages.batches.create( requests=f )
print(f"Batch ID: {batch.id}") print(f"Status: {batch.processing_status}")
Step 3: Poll for Completion
import time
def wait_for_batch(client, batch_id, poll_interval=60): """Poll until batch completes""" while True: batch = client.beta.messages.batches.retrieve(batch_id)
if batch.processing_status == "ended":
print(f"Batch completed!")
print(f" Succeeded: {batch.request_counts.succeeded}")
print(f" Errored: {batch.request_counts.errored}")
print(f" Expired: {batch.request_counts.expired}")
return batch
print(f"Status: {batch.processing_status} - waiting...")
time.sleep(poll_interval)
Step 4: Stream Results
def process_results(client, batch_id): """Stream and process batch results""" results = {}
for result in client.beta.messages.batches.results(batch_id):
custom_id = result.custom_id
if result.result.type == "succeeded":
message = result.result.message
results[custom_id] = {
"status": "success",
"content": message.content[0].text,
"usage": message.usage
}
elif result.result.type == "errored":
results[custom_id] = {
"status": "error",
"error": result.result.error
}
elif result.result.type == "expired":
results[custom_id] = {
"status": "expired"
}
return results
Complete Workflow
import anthropic import json import time
def run_batch_job(documents): """Complete batch processing workflow""" client = anthropic.Anthropic()
# 1. Create requests
requests = []
for i, doc in enumerate(documents):
requests.append({
"custom_id": f"doc-{i}",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [{"role": "user", "content": f"Summarize: {doc}"}]
}
})
# 2. Write JSONL
with open("batch.jsonl", "w") as f:
for req in requests:
f.write(json.dumps(req) + "\n")
# 3. Submit batch
with open("batch.jsonl", "rb") as f:
batch = client.beta.messages.batches.create(requests=f)
print(f"Batch submitted: {batch.id}")
# 4. Wait for completion
while True:
batch = client.beta.messages.batches.retrieve(batch.id)
if batch.processing_status == "ended":
break
time.sleep(60)
# 5. Collect results
results = {}
for result in client.beta.messages.batches.results(batch.id):
if result.result.type == "succeeded":
results[result.custom_id] = result.result.message.content[0].text
return results
Error Handling
Retry Failed Requests
def retry_failed(client, original_batch_id): """Retry failed and expired requests""" retry_requests = []
for result in client.beta.messages.batches.results(original_batch_id):
if result.result.type in ["errored", "expired"]:
# Re-create the request (you'll need to store original params)
retry_requests.append({
"custom_id": result.custom_id,
"params": get_original_params(result.custom_id) # Your implementation
})
if retry_requests:
# Write and submit retry batch
with open("retry.jsonl", "w") as f:
for req in retry_requests:
f.write(json.dumps(req) + "\n")
with open("retry.jsonl", "rb") as f:
return client.beta.messages.batches.create(requests=f)
return None
Error Types
Result Type Action
succeeded
Process normally
errored
Check error, may retry
expired
Request took >24h, retry
canceled
Batch was canceled
API Reference
Endpoints
Method Endpoint Purpose
POST /v1/messages/batches
Create batch
GET /v1/messages/batches/{id}
Get batch status
GET /v1/messages/batches
List batches
POST /v1/messages/batches/{id}/cancel
Cancel batch
GET /v1/messages/batches/{id}/results
Stream results
Request Format
{ "custom_id": "unique-identifier", "params": { "model": "claude-sonnet-4-20250514", "max_tokens": 1024, "system": "Optional system prompt", "messages": [{"role": "user", "content": "..."}], "tools": [], "temperature": 0.7 } }
Best Practices
DO:
-
Use for workloads >100 requests
-
Include unique custom_id for tracking
-
Store original params for retry logic
-
Monitor batch status via polling
-
Process results as streaming JSONL
DON'T:
-
Use for time-sensitive requests
-
Expect immediate results
-
Forget to handle expired requests
-
Ignore error results
Cost Optimization Tips
-
Batch similar requests - Same model, similar prompts
-
Combine with caching - Cache static context before batch
-
Use appropriate model - Haiku for simple tasks, Sonnet for complex
-
Optimize prompts - Shorter prompts = lower cost
Use Case: Document Processing
Process 10,000 documents at 50% cost
documents = load_documents("data/*.txt") # 10K files
Create batch with consistent format
requests = [{ "custom_id": doc.id, "params": { "model": "claude-haiku-4-20250514", # Cheapest for bulk "max_tokens": 512, "system": "Extract key information as JSON.", "messages": [{"role": "user", "content": doc.content}] } } for doc in documents]
Run batch job
results = run_batch_job(requests)
Cost: ~50% of standard API!
See Also
-
llm-integration - API basics
-
prompt-caching - Cache context
-
streaming - Real-time output