AI Session Compression Techniques
Summary
Compress long AI conversations to fit context windows while preserving critical information.
Session compression enables production AI applications to manage multi-turn conversations efficiently by reducing token usage by 70-95% through summarization, embedding-based retrieval, and intelligent context management. Achieve 3-20x compression ratios with minimal performance degradation.
Key Benefits:
-
Cost Reduction: 80-90% token cost savings through hierarchical memory
-
Performance: 2x faster responses with compressed context
-
Scalability: Handle conversations exceeding 1M tokens
-
Quality: Preserve critical information with <2% accuracy loss
When to Use
Use session compression when:
-
Multi-turn conversations approach context window limits (>50% capacity)
-
Long-running chat sessions (customer support, tutoring, code assistants)
-
Token costs become significant (high-volume applications)
-
Response latency increases due to large context
-
Managing conversation history across multiple sessions
Don't use when:
-
Short conversations (<10 turns) fitting easily in context
-
Every detail must be preserved verbatim (legal, compliance)
-
Single-turn or stateless interactions
-
Context window usage is <30%
Ideal scenarios:
-
Chatbots with 50+ turn conversations
-
AI code assistants tracking long development sessions
-
Customer support with multi-session ticket history
-
Educational tutors with student progress tracking
-
Multi-day collaborative AI workflows
Quick Start
Basic Setup with LangChain
from langchain.memory import ConversationSummaryBufferMemory from langchain_anthropic import ChatAnthropic from anthropic import Anthropic
Initialize Claude client
llm = ChatAnthropic( model="claude-3-5-sonnet-20241022", api_key="your-api-key" )
Setup memory with automatic summarization
memory = ConversationSummaryBufferMemory( llm=llm, max_token_limit=2000, # Summarize when exceeding this return_messages=True )
Add conversation turns
memory.save_context( {"input": "What's session compression?"}, {"output": "Session compression reduces conversation token usage..."} )
Retrieve compressed context
context = memory.load_memory_variables({})
Progressive Compression Pattern
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
class ProgressiveCompressor: def init(self, thresholds=[0.70, 0.85, 0.95]): self.thresholds = thresholds self.messages = [] self.max_tokens = 200000 # Claude context window
def add_message(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
# Check if compression needed
current_usage = self._estimate_tokens()
usage_ratio = current_usage / self.max_tokens
if usage_ratio >= self.thresholds[0]:
self._compress(level=self._get_compression_level(usage_ratio))
def _estimate_tokens(self):
return sum(len(m["content"]) // 4 for m in self.messages)
def _get_compression_level(self, ratio):
for i, threshold in enumerate(self.thresholds):
if ratio < threshold:
return i
return len(self.thresholds)
def _compress(self, level: int):
"""Apply compression based on severity level."""
if level == 1: # 70% threshold: Light compression
self._remove_redundant_messages()
elif level == 2: # 85% threshold: Medium compression
self._summarize_old_messages(keep_recent=10)
else: # 95% threshold: Aggressive compression
self._summarize_old_messages(keep_recent=5)
def _remove_redundant_messages(self):
"""Remove duplicate or low-value messages."""
# Implementation: Use semantic deduplication
pass
def _summarize_old_messages(self, keep_recent: int):
"""Summarize older messages, keep recent ones verbatim."""
if len(self.messages) <= keep_recent:
return
# Messages to summarize
to_summarize = self.messages[:-keep_recent]
recent = self.messages[-keep_recent:]
# Generate summary
conversation_text = "\n\n".join([
f"{m['role'].upper()}: {m['content']}"
for m in to_summarize
])
response = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"Summarize this conversation:\n\n{conversation_text}"
}]
)
# Replace old messages with summary
summary = {
"role": "system",
"content": f"[Summary]\n{response.content[0].text}"
}
self.messages = [summary] + recent
Usage
compressor = ProgressiveCompressor()
for i in range(100): compressor.add_message("user", f"Message {i}") compressor.add_message("assistant", f"Response {i}")
Using Anthropic Prompt Caching (90% Cost Reduction)
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
Build context with cache control
messages = [ { "role": "user", "content": [ { "type": "text", "text": "Long conversation context here...", "cache_control": {"type": "ephemeral"} # Cache this } ] }, { "role": "assistant", "content": "Previous response..." }, { "role": "user", "content": "New question" # Not cached, changes frequently } ]
response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=messages )
Cache hit reduces costs by 90% for cached content
Core Concepts
Context Windows and Token Limits
Context window: Maximum tokens an LLM can process in a single request (input + output).
Current limits (2025):
-
Claude 3.5 Sonnet: 200K tokens (~150K words, ~600 pages)
-
GPT-4 Turbo: 128K tokens (~96K words, ~384 pages)
-
Gemini 1.5 Pro: 2M tokens (~1.5M words, ~6000 pages)
Token estimation:
-
English: ~4 characters per token
-
Code: ~3 characters per token
-
Rule of thumb: 1 token ≈ 0.75 words
Why compression matters:
-
Cost: Claude Sonnet costs $3/$15 per 1M input/output tokens
-
Latency: Larger contexts increase processing time
-
Quality: Excessive context can dilute attention on relevant information
Compression Ratios
Compression ratio = Original tokens / Compressed tokens
Industry benchmarks:
-
Extractive summarization: 2-3x
-
Abstractive summarization: 5-10x
-
Hierarchical summarization: 20x+
-
LLMLingua (prompt compression): 20x with 1.5% accuracy loss
-
KVzip (KV cache compression): 3-4x with 2x speed improvement
Target ratios by use case:
-
Customer support: 5-7x (preserve details)
-
General chat: 8-12x (balance quality/efficiency)
-
Code assistants: 3-5x (preserve technical accuracy)
-
Long documents: 15-20x (extract key insights)
Progressive Compression Thresholds
Industry standard pattern:
Context Usage Action Technique ───────────────────────────────────────────────────────── 0-70% No compression Store verbatim 70-85% Light compression Remove redundancy 85-95% Medium compression Summarize old messages 95-100% Aggressive compression Hierarchical + RAG
Implementation guidelines:
-
70% threshold: Remove duplicate/redundant messages, semantic deduplication
-
85% threshold: Summarize messages older than 20 turns, keep recent 10-15
-
95% threshold: Multi-level hierarchical summarization + vector store archival
-
Emergency (100%): Drop least important messages, aggressive summarization
Compression Techniques
- Summarization Techniques
1.1 Extractive Summarization
Selects key sentences/phrases without modification.
Pros: No hallucination, fast, deterministic Cons: Limited compression (2-3x), may feel disjointed Best for: Legal/compliance, short-term compression
from sklearn.feature_extraction.text import TfidfVectorizer import numpy as np
def extractive_compress(messages: list, compression_ratio: float = 0.3): """Extract most important messages using TF-IDF scoring.""" texts = [msg['content'] for msg in messages]
# Calculate TF-IDF scores
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(texts)
scores = np.array(tfidf_matrix.sum(axis=1)).flatten()
# Select top messages
n_keep = max(1, int(len(messages) * compression_ratio))
top_indices = sorted(np.argsort(scores)[-n_keep:])
return [messages[i] for i in top_indices]
1.2 Abstractive Summarization
Uses LLMs to semantically condense conversation history.
Pros: Higher compression (5-10x), coherent, synthesizes information Cons: Risk of hallucination, higher cost, less deterministic Best for: General chat, customer support, multi-session continuity
from anthropic import Anthropic
def abstractive_compress(messages: list, client: Anthropic): """Generate semantic summary using Claude.""" conversation_text = "\n\n".join([ f"{msg['role'].upper()}: {msg['content']}" for msg in messages ])
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""Summarize this conversation, preserving:
- Key decisions made
- Important context and facts
- Unresolved questions
- Action items
Conversation: {conversation_text}
Summary (aim for 1/5 the original length):""" }] )
return {
"role": "assistant",
"content": f"[Summary]\n{response.content[0].text}"
}
1.3 Hierarchical Summarization (Multi-Level)
Creates summaries of summaries in a tree structure.
Pros: Extreme compression (20x+), handles 1M+ token conversations Cons: Complex implementation, multiple LLM calls, information loss accumulates Best for: Long-running conversations, multi-session applications
Architecture:
Level 0 (Raw): [Msg1][Msg2][Msg3][Msg4][Msg5][Msg6][Msg7][Msg8] Level 1 (Chunk): [Summary1-2] [Summary3-4] [Summary5-6] [Summary7-8] Level 2 (Group): [Summary1-4] [Summary5-8] Level 3 (Session): [Overall Session Summary]
from anthropic import Anthropic from typing import List, Dict
class HierarchicalMemory: def init(self, client: Anthropic, chunk_size: int = 10): self.client = client self.chunk_size = chunk_size self.levels: List[List[Dict]] = [[]] # Level 0 = raw messages
def add_message(self, message: Dict):
"""Add message and trigger summarization if needed."""
self.levels[0].append(message)
if len(self.levels[0]) >= self.chunk_size * 2:
self._summarize_level(0)
def _summarize_level(self, level: int):
"""Summarize a level into the next higher level."""
messages = self.levels[level]
# Ensure next level exists
while len(self.levels) <= level + 1:
self.levels.append([])
# Summarize first chunk
chunk = messages[:self.chunk_size]
summary = self._generate_summary(chunk, level)
# Move to next level
self.levels[level + 1].append(summary)
self.levels[level] = messages[self.chunk_size:]
# Recursively check if next level needs summarization
if len(self.levels[level + 1]) >= self.chunk_size * 2:
self._summarize_level(level + 1)
def _generate_summary(self, messages: List[Dict], level: int) -> Dict:
"""Generate summary for a chunk."""
conversation_text = "\n\n".join([
f"{msg['role'].upper()}: {msg['content']}"
for msg in messages
])
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=300,
messages=[{
"role": "user",
"content": f"Summarize this Level {level} conversation chunk:\n\n{conversation_text}"
}]
)
return {
"role": "system",
"content": f"[L{level+1} Summary] {response.content[0].text}",
"level": level + 1
}
def get_context(self, max_tokens: int = 4000) -> List[Dict]:
"""Retrieve context within token budget."""
context = []
token_count = 0
# Prioritize recent raw messages
for msg in reversed(self.levels[0]):
msg_tokens = len(msg['content']) // 4
if token_count + msg_tokens > max_tokens * 0.6:
break
context.insert(0, msg)
token_count += msg_tokens
# Add summaries from higher levels
for level in range(1, len(self.levels)):
for summary in self.levels[level]:
summary_tokens = len(summary['content']) // 4
if token_count + summary_tokens > max_tokens:
break
context.insert(0, summary)
token_count += summary_tokens
return context
Academic reference: "Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models" (arXiv:2308.15022)
1.4 Rolling Summarization (Continuous)
Continuously compresses conversation with sliding window.
Pros: Low latency, predictable token usage, simple Cons: Early details over-compressed, no information recovery Best for: Real-time chat, streaming conversations
from anthropic import Anthropic
class RollingMemory: def init(self, client: Anthropic, window_size: int = 10, compress_threshold: int = 15): self.client = client self.window_size = window_size self.compress_threshold = compress_threshold self.rolling_summary = None self.recent_messages = []
def add_message(self, message: dict):
self.recent_messages.append(message)
if len(self.recent_messages) >= self.compress_threshold:
self._compress()
def _compress(self):
"""Compress older messages into rolling summary."""
messages_to_compress = self.recent_messages[:-self.window_size]
parts = []
if self.rolling_summary:
parts.append(f"Existing summary:\n{self.rolling_summary}")
parts.append("\nNew messages:\n" + "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in messages_to_compress
]))
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=400,
messages=[{
"role": "user",
"content": "\n".join(parts) + "\n\nUpdate the summary:"
}]
)
self.rolling_summary = response.content[0].text
self.recent_messages = self.recent_messages[-self.window_size:]
def get_context(self):
context = []
if self.rolling_summary:
context.append({
"role": "system",
"content": f"[Summary]\n{self.rolling_summary}"
})
context.extend(self.recent_messages)
return context
2. Embedding-Based Approaches
2.1 RAG (Retrieval-Augmented Generation)
Store full conversation in vector database, retrieve only relevant chunks.
Pros: Extremely scalable, no information loss, high relevance Cons: Requires vector DB infrastructure, retrieval latency Best for: Knowledge bases, customer support with large history
from anthropic import Anthropic from openai import OpenAI import chromadb
class RAGMemory: def init(self, anthropic_client: Anthropic, openai_client: OpenAI): self.anthropic = anthropic_client self.openai = openai_client
# Initialize vector store
self.chroma = chromadb.Client()
self.collection = self.chroma.create_collection(
name="conversation",
metadata={"hnsw:space": "cosine"}
)
self.recent_messages = []
self.recent_window = 5
self.message_counter = 0
def add_message(self, message: dict):
"""Add to recent memory and vector store."""
self.recent_messages.append(message)
if len(self.recent_messages) > self.recent_window:
old_msg = self.recent_messages.pop(0)
self._store_in_vectordb(old_msg)
def _store_in_vectordb(self, message: dict):
"""Archive to vector database."""
# Generate embedding
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=message['content']
)
self.collection.add(
embeddings=[response.data[0].embedding],
documents=[message['content']],
metadatas=[{"role": message['role']}],
ids=[f"msg_{self.message_counter}"]
)
self.message_counter += 1
def retrieve_context(self, query: str, max_tokens: int = 4000):
"""Retrieve relevant context using RAG."""
context = []
token_count = 0
# 1. Recent messages (short-term memory)
for msg in self.recent_messages:
context.append(msg)
token_count += len(msg['content']) // 4
# 2. Retrieve relevant historical context
if token_count < max_tokens:
query_embedding = self.openai.embeddings.create(
model="text-embedding-3-small",
input=query
)
n_results = min(10, (max_tokens - token_count) // 100)
results = self.collection.query(
query_embeddings=[query_embedding.data[0].embedding],
n_results=n_results
)
for i, doc in enumerate(results['documents'][0]):
if token_count + len(doc) // 4 > max_tokens:
break
metadata = results['metadatas'][0][i]
context.insert(0, {
"role": metadata['role'],
"content": f"[Retrieved] {doc}"
})
token_count += len(doc) // 4
return context
Vector database options:
-
ChromaDB: Embedded, easy local development
-
Pinecone: Managed, 50ms p95 latency
-
Weaviate: Open-source, hybrid search
-
Qdrant: High performance, payload filtering
2.2 Vector Search and Clustering
Group similar messages into clusters, represent with centroids.
Pros: Reduces redundancy, identifies themes, multi-topic handling Cons: Requires sufficient data, may lose nuances Best for: Multi-topic conversations, meeting summaries
from sklearn.cluster import KMeans from openai import OpenAI import numpy as np
class ClusteredMemory: def init(self, openai_client: OpenAI, n_clusters: int = 5): self.client = openai_client self.n_clusters = n_clusters self.messages = [] self.embeddings = []
def add_messages(self, messages: list):
for msg in messages:
self.messages.append(msg)
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=msg['content']
)
self.embeddings.append(response.data[0].embedding)
def compress_by_clustering(self):
"""Cluster messages and return representatives."""
if len(self.messages) < self.n_clusters:
return self.messages
embeddings_array = np.array(self.embeddings)
kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
labels = kmeans.fit_predict(embeddings_array)
# Select message closest to each centroid
compressed = []
for cluster_id in range(self.n_clusters):
cluster_indices = np.where(labels == cluster_id)[0]
centroid = kmeans.cluster_centers_[cluster_id]
cluster_embeddings = embeddings_array[cluster_indices]
distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
closest_idx = cluster_indices[np.argmin(distances)]
compressed.append({
**self.messages[closest_idx],
"cluster_id": int(cluster_id),
"cluster_size": len(cluster_indices)
})
return compressed
2.3 Semantic Deduplication
Remove semantically similar messages that convey redundant information.
Pros: Reduces redundancy without losing unique content Cons: Requires threshold tuning, O(n²) complexity Best for: FAQ systems, repetitive conversations
from openai import OpenAI import numpy as np from sklearn.metrics.pairwise import cosine_similarity
class SemanticDeduplicator: def init(self, openai_client: OpenAI, similarity_threshold: float = 0.85): self.client = openai_client self.threshold = similarity_threshold
def deduplicate(self, messages: list):
"""Remove semantically similar messages."""
if len(messages) <= 1:
return messages
# Generate embeddings
embeddings = []
for msg in messages:
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=msg['content']
)
embeddings.append(response.data[0].embedding)
embeddings_array = np.array(embeddings)
similarity_matrix = cosine_similarity(embeddings_array)
# Mark unique messages
keep_indices = []
for i in range(len(messages)):
is_unique = True
for j in keep_indices:
if similarity_matrix[i][j] > self.threshold:
is_unique = False
break
if is_unique:
keep_indices.append(i)
return [messages[i] for i in keep_indices]
3. Token-Efficient Strategies
3.1 Message Prioritization
Assign importance scores and retain only high-priority content.
Pros: Retains most important information, flexible criteria Cons: Scoring is heuristic-based, may break flow Best for: Mixed-importance conversations, filtering noise
import re
class MessagePrioritizer: def score_message(self, msg: dict, index: int, total: int) -> float: """Calculate composite importance score.""" scores = []
# Length score (longer = more info)
scores.append(min(len(msg['content']) / 500, 1.0))
# Question score
if msg['role'] == 'user':
scores.append(min(msg['content'].count('?') * 0.5, 1.0))
# Entity score (capitalized words)
entities = len(re.findall(r'\b[A-Z][a-z]+', msg['content']))
scores.append(min(entities / 10, 1.0))
# Recency score (linear decay)
scores.append(index / max(total - 1, 1))
# Role score
scores.append(0.6 if msg['role'] == 'user' else 0.4)
return sum(scores) / len(scores)
def prioritize(self, messages: list, target_count: int):
"""Select top N messages by priority."""
scored = [
(msg, self.score_message(msg, i, len(messages)), i)
for i, msg in enumerate(messages)
]
scored.sort(key=lambda x: x[1], reverse=True)
top_messages = scored[:target_count]
top_messages.sort(key=lambda x: x[2]) # Restore chronological order
return [msg for msg, score, idx in top_messages]
3.2 Delta Compression
Store only changes between consecutive messages.
Pros: Highly efficient for incremental changes Cons: Reconstruction overhead, not suitable for all content Best for: Code assistants with incremental edits
import difflib
class DeltaCompressor: def init(self): self.base_messages = [] self.deltas = []
def add_message(self, message: dict):
if not self.base_messages:
self.base_messages.append(message)
return
# Find most similar previous message
last_msg = self.base_messages[-1]
if last_msg['role'] == message['role']:
# Calculate delta
diff = list(difflib.unified_diff(
last_msg['content'].splitlines(),
message['content'].splitlines(),
lineterm=''
))
if len('\n'.join(diff)) < len(message['content']) * 0.7:
# Store as delta if compression achieved
self.deltas.append({
'base_index': len(self.base_messages) - 1,
'delta': diff,
'role': message['role']
})
return
# Store as new base message
self.base_messages.append(message)
def reconstruct(self):
"""Reconstruct full conversation from bases + deltas."""
messages = self.base_messages.copy()
for delta_info in self.deltas:
base_content = messages[delta_info['base_index']]['content']
# Apply diff to reconstruct (simplified)
reconstructed = base_content # Full implementation would apply diff
messages.append({
'role': delta_info['role'],
'content': reconstructed
})
return messages
4. LangChain Memory Types
4.1 ConversationSummaryMemory
Automatically summarizes conversation as it progresses.
from langchain.memory import ConversationSummaryMemory from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryMemory(llm=llm)
Add conversation
memory.save_context( {"input": "Hi, I'm working on a Python project"}, {"output": "Great! How can I help with your Python project?"} )
Get summary
summary = memory.load_memory_variables({}) print(summary['history'])
Pros: Automatic summarization, simple API Cons: Every turn triggers LLM call Best for: Medium conversations (20-50 turns)
4.2 ConversationSummaryBufferMemory
Hybrid: Recent messages verbatim, older summarized.
from langchain.memory import ConversationSummaryBufferMemory from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-haiku-20241022")
memory = ConversationSummaryBufferMemory( llm=llm, max_token_limit=2000, # Summarize when exceeding return_messages=True )
Add conversation
for i in range(50): memory.save_context( {"input": f"Question {i}"}, {"output": f"Answer {i}"} )
Automatically keeps recent messages + summary of old
context = memory.load_memory_variables({})
Pros: Best balance of detail and compression Cons: Requires token limit tuning Best for: Most production applications
4.3 ConversationTokenBufferMemory
Maintains fixed token budget, drops oldest when exceeded.
from langchain.memory import ConversationTokenBufferMemory from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationTokenBufferMemory( llm=llm, max_token_limit=2000 )
Simple FIFO when token limit exceeded
Pros: Predictable token usage, simple Cons: Loses old information completely Best for: Real-time chat with strict limits
4.4 VectorStoreRetrieverMemory
Stores all messages in vector database, retrieves relevant ones.
from langchain.memory import VectorStoreRetrieverMemory from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings() vectorstore = Chroma(embedding_function=embeddings)
memory = VectorStoreRetrieverMemory( retriever=vectorstore.as_retriever(search_kwargs={"k": 5}) )
Automatically retrieves most relevant context
Pros: Infinite conversation length, semantic retrieval Cons: Requires vector DB, retrieval overhead Best for: Long-running conversations, knowledge bases
- Anthropic-Specific Patterns
5.1 Prompt Caching (90% Cost Reduction)
Cache static context to reduce token costs.
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
Long conversation context
conversation_history = [ {"role": "user", "content": "Message 1"}, {"role": "assistant", "content": "Response 1"}, # ... many more messages ]
Mark context for caching
messages = [] for i, msg in enumerate(conversation_history[:-1]): content = msg['content']
# Add cache control to last context message
if i == len(conversation_history) - 2:
messages.append({
"role": msg['role'],
"content": [
{
"type": "text",
"text": content,
"cache_control": {"type": "ephemeral"}
}
]
})
else:
messages.append(msg)
Add new user message (not cached)
messages.append(conversation_history[-1])
response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=messages )
Subsequent calls with same cached context cost 90% less
Cache TTL: 5 minutes Savings: 90% cost reduction for cached tokens Limits: Max 4 cache breakpoints per request Best practices:
-
Cache conversation history, not current query
-
Update cache when context changes significantly
-
Combine with summarization for maximum efficiency
5.2 Extended Thinking for Compression Planning
Use extended thinking to plan optimal compression strategy.
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
response = client.messages.create( model="claude-3-7-sonnet-20250219", max_tokens=16000, thinking={ "type": "enabled", "budget_tokens": 10000 }, messages=[{ "role": "user", "content": f"""Analyze this conversation and recommend compression:
{conversation_text}
Current token count: {current_tokens} Target: {target_tokens} Required compression: {compression_ratio}x
Recommend optimal strategy.""" }] )
Access thinking process
thinking_content = [ block for block in response.content if block.type == "thinking" ]
Get compression recommendation
recommendation = response.content[-1].text
Production Patterns
Checkpointing and Persistence
Save compression state for recovery and resume.
import json import pickle from pathlib import Path
class PersistentMemory: def init(self, checkpoint_dir: str = "./checkpoints"): self.checkpoint_dir = Path(checkpoint_dir) self.checkpoint_dir.mkdir(exist_ok=True) self.memory = [] self.summary = None
def save_checkpoint(self, session_id: str):
"""Save current memory state."""
checkpoint = {
'messages': self.memory,
'summary': self.summary,
'timestamp': time.time()
}
checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint, f, indent=2)
def load_checkpoint(self, session_id: str):
"""Load memory state from checkpoint."""
checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
if checkpoint_file.exists():
with open(checkpoint_file, 'r') as f:
checkpoint = json.load(f)
self.memory = checkpoint['messages']
self.summary = checkpoint.get('summary')
return True
return False
def auto_checkpoint(self, session_id: str, interval: int = 10):
"""Automatically save every N messages."""
if len(self.memory) % interval == 0:
self.save_checkpoint(session_id)
Resume Workflows
Continue conversations across sessions.
from anthropic import Anthropic import json
class ResumableConversation: def init(self, client: Anthropic, session_id: str): self.client = client self.session_id = session_id self.memory = self._load_or_create()
def _load_or_create(self):
"""Load existing session or create new."""
try:
with open(f'sessions/{self.session_id}.json', 'r') as f:
return json.load(f)
except FileNotFoundError:
return {
'messages': [],
'summary': None,
'created_at': time.time()
}
def add_turn(self, user_message: str):
"""Add user message and get response."""
# Add user message
self.memory['messages'].append({
'role': 'user',
'content': user_message
})
# Build context (with compression)
context = self._build_context()
# Get response
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=context + [{
'role': 'user',
'content': user_message
}]
)
# Save response
assistant_message = response.content[0].text
self.memory['messages'].append({
'role': 'assistant',
'content': assistant_message
})
# Compress if needed
if len(self.memory['messages']) > 20:
self._compress()
# Save state
self._save()
return assistant_message
def _build_context(self):
"""Build context with compression."""
context = []
# Add summary if exists
if self.memory['summary']:
context.append({
'role': 'system',
'content': f"[Previous conversation summary]\n{self.memory['summary']}"
})
# Add recent messages
context.extend(self.memory['messages'][-10:])
return context
def _compress(self):
"""Compress older messages."""
if len(self.memory['messages']) < 15:
return
# Messages to summarize
to_summarize = self.memory['messages'][:-10]
# Generate summary
conversation_text = "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in to_summarize
])
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=500,
messages=[{
'role': 'user',
'content': f"Summarize this conversation:\n\n{conversation_text}"
}]
)
# Update memory
self.memory['summary'] = response.content[0].text
self.memory['messages'] = self.memory['messages'][-10:]
def _save(self):
"""Save session to disk."""
with open(f'sessions/{self.session_id}.json', 'w') as f:
json.dump(self.memory, f, indent=2)
Usage
client = Anthropic(api_key="your-api-key") conversation = ResumableConversation(client, session_id="user123_session1")
Continue across multiple sessions
response1 = conversation.add_turn("What's Python?")
... later session
response2 = conversation.add_turn("Show me an example") # Remembers context
Hybrid Approaches (Best Practice)
Combine multiple techniques for optimal results.
from anthropic import Anthropic from openai import OpenAI import chromadb
class HybridMemorySystem: """ Combines: - Rolling summarization (short-term compression) - RAG retrieval (long-term memory) - Prompt caching (cost optimization) - Progressive compression (adaptive behavior) """
def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
self.anthropic = anthropic_client
self.openai = openai_client
# Recent messages (verbatim)
self.recent_messages = []
self.recent_window = 10
# Rolling summary
self.rolling_summary = None
# Vector store (long-term)
self.chroma = chromadb.Client()
self.collection = self.chroma.create_collection(name="memory")
self.message_counter = 0
# Compression thresholds
self.thresholds = {
'light': 0.70, # Start basic compression
'medium': 0.85, # Aggressive summarization
'heavy': 0.95 # Emergency measures
}
def add_message(self, message: dict):
"""Add message with intelligent compression."""
self.recent_messages.append(message)
# Check compression needs
usage_ratio = self._estimate_usage()
if usage_ratio >= self.thresholds['heavy']:
self._emergency_compress()
elif usage_ratio >= self.thresholds['medium']:
self._medium_compress()
elif usage_ratio >= self.thresholds['light']:
self._light_compress()
def _light_compress(self):
"""Remove redundancy, archive to vector store."""
if len(self.recent_messages) > self.recent_window * 1.5:
# Archive oldest to vector store
to_archive = self.recent_messages[:5]
for msg in to_archive:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[5:]
def _medium_compress(self):
"""Generate rolling summary, aggressive archival."""
if len(self.recent_messages) > self.recent_window:
# Summarize older messages
to_summarize = self.recent_messages[:-self.recent_window]
summary_text = "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in to_summarize
])
if self.rolling_summary:
summary_text = f"Existing: {self.rolling_summary}\n\nNew: {summary_text}"
response = self.anthropic.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=400,
messages=[{
'role': 'user',
'content': f"Update summary:\n{summary_text}"
}]
)
self.rolling_summary = response.content[0].text
# Archive all summarized messages
for msg in to_summarize:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[-self.recent_window:]
def _emergency_compress(self):
"""Extreme compression for near-limit situations."""
# Keep only 5 most recent messages
to_archive = self.recent_messages[:-5]
for msg in to_archive:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[-5:]
# Compress summary further if needed
if self.rolling_summary and len(self.rolling_summary) > 1000:
response = self.anthropic.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=200,
messages=[{
'role': 'user',
'content': f"Create ultra-concise summary:\n{self.rolling_summary}"
}]
)
self.rolling_summary = response.content[0].text
def _archive_to_vectorstore(self, message: dict):
"""Store in vector database for retrieval."""
embedding_response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=message['content']
)
self.collection.add(
embeddings=[embedding_response.data[0].embedding],
documents=[message['content']],
metadatas=[{'role': message['role']}],
ids=[f"msg_{self.message_counter}"]
)
self.message_counter += 1
def get_context(self, current_query: str, max_tokens: int = 8000):
"""Build optimal context for current query."""
context = []
token_count = 0
# 1. Add rolling summary (if exists)
if self.rolling_summary:
summary_msg = {
'role': 'system',
'content': [
{
'type': 'text',
'text': f"[Conversation Summary]\n{self.rolling_summary}",
'cache_control': {'type': 'ephemeral'} # Cache it
}
]
}
context.append(summary_msg)
token_count += len(self.rolling_summary) // 4
# 2. Retrieve relevant historical context (RAG)
if token_count < max_tokens * 0.3:
query_embedding = self.openai.embeddings.create(
model="text-embedding-3-small",
input=current_query
)
results = self.collection.query(
query_embeddings=[query_embedding.data[0].embedding],
n_results=5
)
for i, doc in enumerate(results['documents'][0]):
if token_count + len(doc) // 4 > max_tokens * 0.3:
break
metadata = results['metadatas'][0][i]
context.append({
'role': metadata['role'],
'content': f"[Retrieved] {doc}"
})
token_count += len(doc) // 4
# 3. Add recent messages verbatim
for msg in self.recent_messages:
if token_count + len(msg['content']) // 4 > max_tokens * 0.8:
break
context.append(msg)
token_count += len(msg['content']) // 4
return context
def _estimate_usage(self):
"""Estimate current context window usage."""
total_tokens = 0
if self.rolling_summary:
total_tokens += len(self.rolling_summary) // 4
for msg in self.recent_messages:
total_tokens += len(msg['content']) // 4
return total_tokens / 200000 # Claude Sonnet context window
Usage
anthropic_client = Anthropic(api_key="your-anthropic-key") openai_client = OpenAI(api_key="your-openai-key")
memory = HybridMemorySystem(anthropic_client, openai_client)
Add messages over time
for i in range(1000): memory.add_message({ 'role': 'user' if i % 2 == 0 else 'assistant', 'content': f"Message {i} with some content..." })
Retrieve optimized context
current_query = "What did we discuss about pricing?" context = memory.get_context(current_query)
Use with Claude
response = anthropic_client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=context + [{ 'role': 'user', 'content': current_query }] )
Performance Benchmarks
Compression Efficiency
Technique Compression Ratio Quality Loss Latency Cost Impact
Extractive 2-3x <1% <10ms None
Abstractive 5-10x 2-5% 1-2s +$0.001/turn
Hierarchical 20x+ 5-8% 2-5s +$0.003/turn
LLMLingua 20x 1.5% 500ms None
RAG Variable <1% 100-300ms +$0.0005/turn
Prompt Caching N/A 0% 0ms -90%
Token Savings by Use Case
Customer Support (50-turn conversation):
-
No compression: ~8,000 tokens/request
-
Rolling summary: ~2,000 tokens/request (75% reduction)
-
Hybrid (RAG + summary): ~1,500 tokens/request (81% reduction)
Code Assistant (100-turn session):
-
No compression: ~25,000 tokens/request
-
Hierarchical: ~5,000 tokens/request (80% reduction)
-
Hybrid + caching: ~1,000 tokens/request effective (96% cost reduction)
Educational Tutor (multi-session):
-
No compression: Would exceed context window
-
RAG + summarization: ~3,000 tokens/request
-
Infinite session length enabled
Cost Analysis
Example: Claude Sonnet pricing ($3 input, $15 output per 1M tokens)
1,000 conversations, 50 turns each:
No compression:
-
Avg 8K tokens/request × 50K requests = 400M tokens
-
Cost: $1,200
With rolling summarization:
-
Avg 2K tokens/request × 50K requests = 100M tokens
-
Summarization overhead: +10M tokens
-
Cost: $330 (72% savings)
With hybrid system + caching:
-
First turn: 2K tokens (no cache)
-
Subsequent: 200 tokens effective (90% cache hit)
-
Total: ~15M tokens effective
-
Cost: $45 (96% savings)
Tool Recommendations
Memory Management Tools
Mem0 (Recommended for Production)
Best for: Hybrid memory systems with minimal code
from mem0 import MemoryClient
client = MemoryClient(api_key="your-mem0-key")
Automatically handles compression, summarization, RAG
memory = client.create_memory( user_id="user123", messages=[ {"role": "user", "content": "I'm working on a Python project"}, {"role": "assistant", "content": "Great! What kind of project?"} ] )
Retrieve relevant context
context = client.get_memory( user_id="user123", query="What programming language am I using?" )
Features:
-
Automatic hierarchical summarization
-
Built-in RAG retrieval
-
Multi-user session management
-
Analytics dashboard
Pricing: $0.40/1K memory operations
Zep
Best for: Low-latency production deployments**
from zep_python import ZepClient
client = ZepClient(api_key="your-zep-key")
Add to session
client.memory.add_memory( session_id="session123", messages=[ {"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi there!"} ] )
Auto-summarized retrieval
memory = client.memory.get_memory(session_id="session123")
Features:
-
<100ms retrieval latency
-
Automatic fact extraction
-
Entity recognition
-
Session management
Pricing: Open-source (self-hosted) or $0.50/1K operations (cloud)
ChromaDB
Best for: Self-hosted vector storage**
import chromadb
client = chromadb.Client() collection = client.create_collection("conversations")
Store embeddings
collection.add( documents=["Message content"], embeddings=[[0.1, 0.2, ...]], ids=["msg1"] )
Retrieve
results = collection.query( query_embeddings=[[0.1, 0.2, ...]], n_results=5 )
Features:
-
Fully open-source
-
Embedded or client-server
-
Fast local development
Pricing: Free (self-hosted)
LangChain
Best for: Rapid prototyping and experimentation**
from langchain.memory import ConversationSummaryBufferMemory from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022") memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=2000)
Features:
-
Multiple memory types
-
Framework integration
-
Extensive documentation
Pricing: Free (uses your LLM API costs)
Compression Libraries
LLMLingua
Best for: Extreme compression with minimal quality loss**
from llmlingua import PromptCompressor
compressor = PromptCompressor()
compressed = compressor.compress_prompt( context="Long conversation history...", instruction="Current user query", target_token=500 )
Achieves 20x compression with 1.5% accuracy loss
Features:
-
20x compression ratios
-
<2% quality degradation
-
Fast inference (<500ms)
Pricing: Free (open-source)
Use Cases and Patterns
Chatbot (Customer Support)
Requirements:
-
Multi-turn conversations (50-100 turns)
-
Preserve customer context
-
Fast response times
-
Cost-efficient
Recommended approach:
-
ConversationSummaryBufferMemory (LangChain)
-
70% threshold: Semantic deduplication
-
85% threshold: Rolling summarization
-
Prompt caching for frequent patterns
Implementation:
from langchain.memory import ConversationSummaryBufferMemory from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-haiku-20241022")
memory = ConversationSummaryBufferMemory( llm=llm, max_token_limit=2000, return_messages=True )
Add customer conversation
for turn in customer_conversation: memory.save_context( {"input": turn['customer_message']}, {"output": turn['agent_response']} )
Retrieve compressed context
context = memory.load_memory_variables({})
Code Assistant
Requirements:
-
Long development sessions (100+ turns)
-
Preserve technical details
-
Handle large code blocks
-
Track incremental changes
Recommended approach:
-
Hierarchical summarization for overall context
-
RAG retrieval for specific code references
-
Delta compression for iterative edits
-
Prompt caching for system prompts
Implementation:
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
class CodeAssistantMemory: def init(self): self.hierarchy = HierarchicalMemory(client, chunk_size=15) self.rag = RAGMemory(anthropic_client=client, openai_client=openai_client) self.deltas = DeltaCompressor()
def add_interaction(self, code_change: dict):
# Store in hierarchy
self.hierarchy.add_message({
'role': 'user',
'content': code_change['description']
})
# Store in RAG for retrieval
self.rag.add_message(code_change)
# Store as delta if incremental
if code_change.get('is_incremental'):
self.deltas.add_message(code_change)
def get_context(self, current_query: str):
# Combine hierarchical summary + RAG retrieval
summary_context = self.hierarchy.get_context(max_tokens=2000)
rag_context = self.rag.retrieve_context(current_query, max_tokens=2000)
return summary_context + rag_context
Educational Tutor
Requirements:
-
Multi-session tracking
-
Student progress persistence
-
Personalized context retrieval
-
Long-term knowledge retention
Recommended approach:
-
VectorStoreRetrieverMemory for multi-session
-
Fact extraction for student knowledge
-
Progressive compression across sessions
-
Resumable conversations
Implementation:
from langchain.memory import VectorStoreRetrieverMemory from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings
class TutorMemory: def init(self, student_id: str): self.student_id = student_id
# Vector store for all sessions
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
collection_name=f"student_{student_id}",
embedding_function=embeddings
)
self.memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 10})
)
def add_lesson_content(self, lesson: dict):
"""Add lesson interaction to student memory."""
self.memory.save_context(
{"input": lesson['topic']},
{"output": lesson['explanation']}
)
def get_student_context(self, current_topic: str):
"""Retrieve relevant past lessons for current topic."""
return self.memory.load_memory_variables({
"prompt": current_topic
})
Best Practices
- Choose the Right Technique for Your Use Case
-
Short conversations (<20 turns): No compression needed
-
Medium conversations (20-50 turns): ConversationSummaryBufferMemory
-
Long conversations (50-100 turns): Hierarchical or rolling summarization
-
Very long (100+ turns): Hybrid (RAG + summarization + caching)
-
Multi-session: VectorStoreRetrieverMemory or Mem0
- Implement Progressive Compression
Don't compress aggressively from the start. Use thresholds:
-
0-70%: Store verbatim
-
70-85%: Light compression (deduplication)
-
85-95%: Medium compression (summarization)
-
95-100%: Aggressive compression (hierarchical)
- Combine Techniques
Single-technique approaches are suboptimal. Best production systems use:
-
Rolling summarization (short-term)
-
RAG retrieval (long-term)
-
Prompt caching (cost optimization)
-
Semantic deduplication (redundancy removal)
- Monitor Quality Metrics
Track compression impact:
-
Response relevance score
-
Information retention rate
-
User satisfaction metrics
-
Token usage reduction
- Use Prompt Caching Strategically
Cache stable content:
-
Conversation summaries
-
System prompts
-
Knowledge base context
-
User profiles
Don't cache frequently changing content:
-
Current user query
-
Real-time data
-
Session-specific state
- Implement Checkpointing
Save compression state for:
-
Recovery from failures
-
Multi-session continuity
-
Analytics and debugging
-
A/B testing different strategies
- Tune Compression Parameters
Test and optimize:
-
Summary token limits
-
Compression thresholds
-
Retrieval result counts
-
Cache TTLs
-
Chunk sizes for hierarchical
- Handle Edge Cases
Plan for:
-
Very long messages (split or compress individually)
-
Code blocks (preserve formatting)
-
Multi-language content
-
Rapidly changing context
Troubleshooting
Problem: Summary loses critical information
Solutions:
-
Lower compression ratio (less aggressive)
-
Implement importance scoring to preserve key messages
-
Use extractive summarization for critical sections
-
Increase summary token budget
Problem: Retrieval returns irrelevant context
Solutions:
-
Improve embedding model quality
-
Add metadata filtering (timestamps, topics)
-
Adjust similarity threshold
-
Use hybrid search (semantic + keyword)
Problem: High latency from compression
Solutions:
-
Compress asynchronously (background tasks)
-
Use faster models for summarization (Haiku instead of Sonnet)
-
Cache summaries more aggressively
-
Reduce compression frequency
Problem: Conversations still exceeding context window
Solutions:
-
Implement hierarchical compression
-
Archive to vector database more aggressively
-
Use more aggressive compression ratios
-
Consider switching to model with larger context window
Problem: High costs despite compression
Solutions:
-
Implement prompt caching
-
Use cheaper models for summarization (Haiku)
-
Batch summarization operations
-
Reduce summarization frequency
Problem: Lost conversation continuity
Solutions:
-
Increase recent message window
-
Include summary in every request
-
Use more descriptive summaries
-
Implement session resumption with context injection
Advanced Topics
Streaming Compression
Compress in real-time as conversation progresses:
async def streaming_compress(messages: list): """Compress while streaming responses.""" compressor = ProgressiveCompressor()
async for message in conversation_stream:
compressor.add_message(message)
# Compression happens asynchronously
if compressor.should_compress():
asyncio.create_task(compressor.compress_async())
return compressor.get_context()
Multi-User Session Management
Handle concurrent conversations with shared context:
class MultiUserMemory: def init(self): self.user_sessions = {}
def get_or_create_session(self, user_id: str):
if user_id not in self.user_sessions:
self.user_sessions[user_id] = HybridMemorySystem(...)
return self.user_sessions[user_id]
def cleanup_inactive_sessions(self, timeout: int = 3600):
"""Remove sessions inactive for > timeout seconds."""
current_time = time.time()
inactive = [
user_id for user_id, session in self.user_sessions.items()
if current_time - session.last_activity > timeout
]
for user_id in inactive:
self._archive_session(user_id)
del self.user_sessions[user_id]
Custom Importance Scoring
Train ML models to score message importance:
from transformers import pipeline
class MLImportanceScorer: def init(self): # Use pre-trained classifier or fine-tune on your data self.classifier = pipeline( "text-classification", model="your-importance-model" )
def score(self, message: dict) -> float:
"""Score message importance (0-1)."""
result = self.classifier(message['content'])
return result[0]['score']
Context Window Utilization Optimization
Maximize information density within token budget:
def optimize_context_allocation( summary_tokens: int, recent_tokens: int, retrieval_tokens: int, max_tokens: int ): """ Optimal allocation (empirically tested): - 20% summary - 50% recent messages - 30% retrieved context """ return { 'summary': int(max_tokens * 0.20), 'recent': int(max_tokens * 0.50), 'retrieval': int(max_tokens * 0.30) }
Future Directions
Emerging Techniques (2025+)
- Infinite Attention Mechanisms
-
Models with >10M token context windows (Gemini 1.5, future Claude)
-
Reduces need for compression but doesn't eliminate cost concerns
- Learned Compression Models
-
Neural networks trained to compress conversation optimally
-
Maintain semantic meaning while minimizing tokens
-
Examples: LLMLingua v2, PromptCompressor
- Multimodal Session Compression
-
Compress conversations with images, audio, video
-
Maintain cross-modal context relationships
- Federated Memory Systems
-
Distributed compression across multiple memory stores
-
Privacy-preserving compression for sensitive conversations
- Adaptive Compression Strategies
-
RL-based systems that learn optimal compression per user/domain
-
Dynamic threshold adjustment based on conversation importance
References
Academic Papers
-
"Recursively Summarizing Enables Long-Term Dialogue Memory" (arXiv:2308.15022)
-
"LLMLingua: Compressing Prompts for Accelerated Inference" (arXiv:2310.05736)
-
"Lost in the Middle: How Language Models Use Long Contexts" (arXiv:2307.03172)
Documentation
-
Anthropic Prompt Caching
-
LangChain Memory
-
OpenAI Function Calling
Tools
-
Mem0 - Managed memory service
-
Zep - Fast memory layer
-
LLMLingua - Prompt compression
-
ChromaDB - Vector database
Last Updated: 2025-11-30 Version: 1.0.0 License: MIT