LangChain Orchestration Skill
Complete guide for building production-grade LLM applications with LangChain, covering chains, agents, memory, RAG patterns, and advanced orchestration techniques.
Table of Contents
-
Core Concepts
-
Chains
-
Agents
-
Memory Systems
-
RAG Patterns
-
LLM Integrations
-
Callbacks & Monitoring
-
Retrieval Strategies
-
Streaming
-
Error Handling
-
Production Best Practices
Core Concepts
LangChain Expression Language (LCEL)
LCEL is the declarative way to compose chains in LangChain, enabling streaming, async, and parallel execution.
from langchain_core.runnables import RunnablePassthrough from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_openai import ChatOpenAI
Basic LCEL chain
prompt = ChatPromptTemplate.from_template("Tell me about {topic}") llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) output_parser = StrOutputParser()
chain = prompt | llm | output_parser result = chain.invoke({"topic": "quantum computing"})
Runnable Interface
Every component in LangChain implements the Runnable interface with standard methods:
from langchain_core.runnables import RunnablePassthrough
Key methods: invoke, stream, batch, ainvoke, astream, abatch
chain = prompt | llm | output_parser
Synchronous invoke
result = chain.invoke({"topic": "AI"})
Streaming
for chunk in chain.stream({"topic": "AI"}): print(chunk, end="", flush=True)
Batch processing
results = chain.batch([{"topic": "AI"}, {"topic": "ML"}])
Async variants
result = await chain.ainvoke({"topic": "AI"})
RunnablePassthrough
Pass inputs directly through or apply transformations:
from langchain_core.runnables import RunnablePassthrough
Pass through unchanged
chain = RunnablePassthrough() | llm | output_parser
With transformation
def add_context(x): return {"text": x["input"], "context": "important"}
chain = RunnablePassthrough.assign(processed=add_context) | llm
Chains
Sequential Chains
Process data through multiple steps sequentially.
from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_openai import ChatOpenAI
llm = ChatOpenAI(temperature=0)
Step 1: Generate ideas
idea_prompt = ChatPromptTemplate.from_template( "Generate 3 creative ideas for: {topic}" ) idea_chain = idea_prompt | llm | StrOutputParser()
Step 2: Evaluate ideas
eval_prompt = ChatPromptTemplate.from_template( "Evaluate these ideas and pick the best one:\n{ideas}" ) eval_chain = eval_prompt | llm | StrOutputParser()
Combine into sequential chain
sequential_chain = ( {"ideas": idea_chain} | RunnablePassthrough.assign(evaluation=eval_chain) )
result = sequential_chain.invoke({"topic": "mobile app"})
Map-Reduce Chains
Process multiple inputs in parallel and combine results.
from langchain_core.runnables import RunnableParallel from langchain_core.prompts import ChatPromptTemplate
Define parallel processing
summary_prompt = ChatPromptTemplate.from_template( "Summarize this text in one sentence: {text}" ) keywords_prompt = ChatPromptTemplate.from_template( "Extract 3 keywords from: {text}" ) sentiment_prompt = ChatPromptTemplate.from_template( "Analyze sentiment (positive/negative/neutral): {text}" )
Map: Process in parallel
map_chain = RunnableParallel( summary=summary_prompt | llm | StrOutputParser(), keywords=keywords_prompt | llm | StrOutputParser(), sentiment=sentiment_prompt | llm | StrOutputParser() )
Reduce: Combine results
reduce_prompt = ChatPromptTemplate.from_template( """Combine the analysis: Summary: {summary} Keywords: {keywords} Sentiment: {sentiment}
Provide a comprehensive report:"""
)
map_reduce_chain = map_chain | reduce_prompt | llm | StrOutputParser()
result = map_reduce_chain.invoke({ "text": "LangChain is an amazing framework for building LLM applications." })
Router Chains
Route inputs to different chains based on conditions.
from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser
Define specialized chains
technical_prompt = ChatPromptTemplate.from_template( "Provide a technical explanation of: {query}" ) simple_prompt = ChatPromptTemplate.from_template( "Explain in simple terms: {query}" )
technical_chain = technical_prompt | llm | StrOutputParser() simple_chain = simple_prompt | llm | StrOutputParser()
Router function
def route_query(input_dict): query = input_dict["query"] complexity = input_dict.get("complexity", "simple")
if complexity == "technical":
return technical_chain
return simple_chain
Create router chain
from langchain_core.runnables import RunnableLambda
router_chain = RunnableLambda(route_query)
Use the router
result = router_chain.invoke({ "query": "quantum entanglement", "complexity": "technical" })
Conditional Chains
Execute chains based on conditions.
from langchain_core.runnables import RunnableBranch
Define condition-based routing
classification_prompt = ChatPromptTemplate.from_template( "Classify this as 'question', 'statement', or 'command': {text}" )
question_handler = ChatPromptTemplate.from_template( "Answer this question: {text}" ) | llm | StrOutputParser()
statement_handler = ChatPromptTemplate.from_template( "Acknowledge this statement: {text}" ) | llm | StrOutputParser()
command_handler = ChatPromptTemplate.from_template( "Execute this command: {text}" ) | llm | StrOutputParser()
Create conditional branch
branch = RunnableBranch( (lambda x: "question" in x["type"].lower(), question_handler), (lambda x: "statement" in x["type"].lower(), statement_handler), command_handler # default )
Full chain with classification
full_chain = ( {"text": RunnablePassthrough(), "type": classification_prompt | llm | StrOutputParser()} | branch )
LLMChain (Legacy)
Traditional chain format still supported:
from langchain.chains import LLMChain from langchain_core.prompts import PromptTemplate
prompt = PromptTemplate( input_variables=["product"], template="What is a good name for a company that makes {product}?" )
chain = LLMChain(llm=llm, prompt=prompt) result = chain.run(product="eco-friendly water bottles")
Stuff Documents Chain
Combine documents into a single context:
from langchain.chains.combine_documents import create_stuff_documents_chain from langchain_core.documents import Document
prompt = ChatPromptTemplate.from_template( """Answer based on the following context:
<context> {context} </context>
Question: {input}""" )
document_chain = create_stuff_documents_chain(llm, prompt)
docs = [ Document(page_content="LangChain supports multiple LLM providers."), Document(page_content="Chains can be composed using LCEL.") ]
result = document_chain.invoke({ "input": "What does LangChain support?", "context": docs })
Agents
ReAct Agents
Reasoning and Acting agents that use tools iteratively.
from langchain.agents import create_react_agent, AgentExecutor from langchain_core.tools import Tool from langchain import hub
Define tools
def search_tool(query: str) -> str: """Search for information""" return f"Search results for: {query}"
def calculator_tool(expression: str) -> str: """Calculate mathematical expressions""" try: return str(eval(expression)) except: return "Invalid expression"
tools = [ Tool( name="Search", func=search_tool, description="Useful for searching information" ), Tool( name="Calculator", func=calculator_tool, description="Useful for math calculations" ) ]
Create ReAct agent
prompt = hub.pull("hwchase17/react") agent = create_react_agent(llm, tools, prompt) agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, max_iterations=5 )
result = agent_executor.invoke({ "input": "What is 25 * 4, and then search for that number's significance" })
LangGraph ReAct Agent
Modern approach using LangGraph for better control:
from langgraph.prebuilt import create_react_agent from langchain_core.tools import tool from langgraph.checkpoint.memory import MemorySaver
@tool def retrieve(query: str) -> str: """Retrieve relevant information from the knowledge base""" # Your retrieval logic here return f"Retrieved information for: {query}"
@tool def analyze(text: str) -> str: """Analyze text and provide insights""" return f"Analysis of: {text}"
Create agent with memory
memory = MemorySaver() agent_executor = create_react_agent( llm, [retrieve, analyze], checkpointer=memory )
Use with configuration
config = {"configurable": {"thread_id": "abc123"}} for chunk in agent_executor.stream( {"messages": [("user", "Find information about LangChain")]}, config=config ): print(chunk)
Conversational ReAct Agent
Agent with built-in conversation memory:
from langchain.agents import create_conversational_retrieval_agent from langchain_core.tools import Tool
tools = [ Tool( name="Knowledge Base", func=lambda q: f"KB result: {q}", description="Search the knowledge base" ) ]
conversational_agent = create_conversational_retrieval_agent( llm, tools, verbose=True )
Maintains conversation context
result1 = conversational_agent.invoke({ "input": "What is LangChain?" }) result2 = conversational_agent.invoke({ "input": "Tell me more about its features" })
Zero-Shot React Agent
Agent that works without examples:
from langchain.agents import AgentType, initialize_agent, load_tools
Load pre-built tools
tools = load_tools(["serpapi", "llm-math"], llm=llm)
agent = initialize_agent( tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True, max_iterations=3 )
result = agent.run( "What is the population of Tokyo and what is that number divided by 2?" )
Structured Chat Agent
Agent that uses structured input/output:
from langchain.agents import create_structured_chat_agent
Define tools with structured schemas
from pydantic import BaseModel, Field
class SearchInput(BaseModel): query: str = Field(description="The search query") max_results: int = Field(default=5, description="Maximum results")
@tool(args_schema=SearchInput) def structured_search(query: str, max_results: int = 5) -> str: """Search with structured parameters""" return f"Found {max_results} results for: {query}"
tools = [structured_search]
prompt = hub.pull("hwchase17/structured-chat-agent") agent = create_structured_chat_agent(llm, tools, prompt) agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
Tool Calling Agent
Modern agent using native tool calling:
from langchain_core.tools import tool
@tool def multiply(a: int, b: int) -> int: """Multiply two numbers""" return a * b
@tool def search_database(query: str, limit: int = 10) -> str: """Search the database""" return f"Found {limit} results for {query}"
Bind tools to LLM
llm_with_tools = llm.bind_tools([multiply, search_database])
Create simple tool chain
from operator import itemgetter
tool_chain = llm_with_tools | (lambda x: x.tool_calls[0]["args"]) | multiply result = tool_chain.invoke("What's four times 23")
Memory Systems
ConversationBufferMemory
Store complete conversation history:
from langchain.memory import ConversationBufferMemory from langchain.chains import LLMChain
memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True )
prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful assistant."), ("placeholder", "{chat_history}"), ("human", "{input}") ])
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
Conversation is automatically stored
response1 = chain.run(input="Hi, I'm Alice") response2 = chain.run(input="What's my name?") # Will remember Alice
ConversationBufferWindowMemory
Keep only recent K interactions:
from langchain.memory import ConversationBufferWindowMemory
memory = ConversationBufferWindowMemory( k=5, # Keep last 5 interactions memory_key="chat_history", return_messages=True )
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
ConversationSummaryMemory
Summarize conversation history:
from langchain.memory import ConversationSummaryMemory
memory = ConversationSummaryMemory( llm=llm, memory_key="chat_history", return_messages=True )
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
Long conversations are automatically summarized
for i in range(20): chain.run(input=f"Tell me fact {i} about AI")
ConversationSummaryBufferMemory
Hybrid approach: recent messages + summary:
from langchain.memory import ConversationSummaryBufferMemory
memory = ConversationSummaryBufferMemory( llm=llm, max_token_limit=100, # When to trigger summarization memory_key="chat_history", return_messages=True )
Vector Store Memory
Semantic search over conversation history:
from langchain.memory import VectorStoreRetrieverMemory from langchain_community.vectorstores import FAISS from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings() vectorstore = FAISS.from_texts([], embeddings)
memory = VectorStoreRetrieverMemory( retriever=vectorstore.as_retriever(search_kwargs={"k": 5}) )
Save context
memory.save_context( {"input": "My favorite color is blue"}, {"output": "That's great!"} )
Retrieve relevant context
relevant = memory.load_memory_variables({"input": "What's my favorite color?"})
Recall Memories (LangGraph)
Structured memory with save and search:
from langchain_core.vectorstores import InMemoryVectorStore from langchain_openai import OpenAIEmbeddings from langchain_core.tools import tool
recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
@tool def save_recall_memory(memory: str) -> str: """Save important information to long-term memory""" recall_vector_store.add_texts([memory]) return f"Saved memory: {memory}"
@tool def search_recall_memories(query: str) -> str: """Search long-term memories""" docs = recall_vector_store.similarity_search(query, k=3) return "\n".join([doc.page_content for doc in docs])
Use with agent
from langgraph.prebuilt import create_react_agent
agent = create_react_agent( llm, [save_recall_memory, search_recall_memories] )
Custom Memory with LangGraph State
Define custom state for memory:
from typing import List from langgraph.graph import MessagesState, StateGraph, START, END
class State(MessagesState): recall_memories: List[str]
def load_memories(state: State): """Load relevant memories before agent processes input""" messages = state["messages"] last_message = messages[-1].content if messages else ""
# Search for relevant memories
docs = recall_vector_store.similarity_search(last_message, k=3)
memories = [doc.page_content for doc in docs]
return {"recall_memories": memories}
Add to graph
builder = StateGraph(State) builder.add_node(load_memories) builder.add_edge(START, "load_memories")
RAG Patterns
Basic RAG Chain
Fundamental retrieval-augmented generation:
from langchain_community.vectorstores import FAISS from langchain_openai import OpenAIEmbeddings from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough
Setup vector store
embeddings = OpenAIEmbeddings() vectorstore = FAISS.from_texts( [ "LangChain supports multiple LLM providers including OpenAI, Anthropic, and more.", "Chains can be composed using LangChain Expression Language (LCEL).", "Agents can use tools to interact with external systems." ], embedding=embeddings )
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
RAG prompt
template = """Answer the question based only on the following context:
{context}
Question: {question} """ prompt = ChatPromptTemplate.from_template(template)
def format_docs(docs): return "\n\n".join(doc.page_content for doc in docs)
Build RAG chain
rag_chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() )
result = rag_chain.invoke("What does LangChain support?")
RAG with Retrieval Chain
Using built-in retrieval chain constructor:
from langchain.chains import create_retrieval_chain from langchain.chains.combine_documents import create_stuff_documents_chain
prompt = ChatPromptTemplate.from_template( """Answer based on the context:
<context> {context} </context>
Question: {input}""" )
document_chain = create_stuff_documents_chain(llm, prompt) retrieval_chain = create_retrieval_chain(retriever, document_chain)
response = retrieval_chain.invoke({ "input": "What is LCEL?" })
Returns: {"input": "...", "context": [...], "answer": "..."}
RAG with Chat History
Conversational RAG with context:
from langchain.chains import create_history_aware_retriever from langchain_core.prompts import MessagesPlaceholder
contextualize_prompt = ChatPromptTemplate.from_messages([ ("system", "Given a chat history and the latest user question, " "formulate a standalone question which can be understood " "without the chat history."), MessagesPlaceholder("chat_history"), ("human", "{input}") ])
history_aware_retriever = create_history_aware_retriever( llm, retriever, contextualize_prompt )
Use in RAG chain
qa_chain = create_retrieval_chain( history_aware_retriever, document_chain )
First question
result1 = qa_chain.invoke({ "input": "What is LangChain?", "chat_history": [] })
Follow-up with context
result2 = qa_chain.invoke({ "input": "What are its main features?", "chat_history": [ ("human", "What is LangChain?"), ("ai", result1["answer"]) ] })
Multi-Query RAG
Generate multiple search queries for better retrieval:
from langchain.retrievers.multi_query import MultiQueryRetriever
multi_query_retriever = MultiQueryRetriever.from_llm( retriever=vectorstore.as_retriever(), llm=llm )
Automatically generates multiple query variations
rag_chain = ( {"context": multi_query_retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() )
RAG with Reranking
Improve relevance with reranking:
from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import FlashrankRerank
Setup reranker
compressor = FlashrankRerank() compression_retriever = ContextualCompressionRetriever( base_compressor=compressor, base_retriever=retriever )
Use in RAG chain
rag_chain = ( {"context": compression_retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() )
Parent Document Retrieval
Retrieve larger parent documents for full context:
from langchain.retrievers import ParentDocumentRetriever from langchain.storage import InMemoryStore from langchain_text_splitters import RecursiveCharacterTextSplitter
Storage for parent documents
store = InMemoryStore()
Splitters
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400) parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
parent_retriever = ParentDocumentRetriever( vectorstore=vectorstore, docstore=store, child_splitter=child_splitter, parent_splitter=parent_splitter, )
Add documents
parent_retriever.add_documents(documents)
Self-Query Retrieval
Natural language to structured queries:
from langchain.retrievers.self_query.base import SelfQueryRetriever from langchain.chains.query_constructor.base import AttributeInfo
metadata_field_info = [ AttributeInfo( name="source", description="The document source", type="string", ), AttributeInfo( name="page", description="The page number", type="integer", ), ]
document_content_description = "Technical documentation"
self_query_retriever = SelfQueryRetriever.from_llm( llm, vectorstore, document_content_description, metadata_field_info, )
LLM Integrations
OpenAI Integration
from langchain_openai import ChatOpenAI, OpenAI
Chat model
chat_model = ChatOpenAI( model="gpt-4o-mini", temperature=0.7, max_tokens=500, api_key="your-api-key" )
Completion model
completion_model = OpenAI( model="gpt-3.5-turbo-instruct", temperature=0.9 )
Anthropic Claude Integration
from langchain_anthropic import ChatAnthropic
claude = ChatAnthropic( model="claude-3-5-sonnet-20241022", temperature=0, max_tokens=1024, api_key="your-api-key" )
HuggingFace Integration
from langchain_huggingface import HuggingFaceEndpoint
llm = HuggingFaceEndpoint( repo_id="meta-llama/Llama-2-7b-chat-hf", huggingfacehub_api_token="your-token", task="text-generation", temperature=0.7 )
Google Vertex AI Integration
from langchain_google_vertexai import ChatVertexAI, VertexAI
Chat model
chat_model = ChatVertexAI( model_name="chat-bison", temperature=0 )
Completion model
completion_model = VertexAI( model_name="gemini-1.0-pro-002" )
Ollama Local Models
from langchain_community.llms import Ollama
llm = Ollama( model="llama2", temperature=0.8 )
Binding Tools to LLMs
from langchain_core.tools import tool
@tool def multiply(a: int, b: int) -> int: """Multiply two numbers together""" return a * b
Bind tools to model
llm_with_tools = llm.bind_tools([multiply])
Model will return tool calls
response = llm_with_tools.invoke("What is 3 times 4?") print(response.tool_calls)
Callbacks & Monitoring
Standard Callbacks
Track chain execution:
from langchain_core.callbacks import StdOutCallbackHandler from langchain.callbacks import get_openai_callback
Standard output callback
callbacks = [StdOutCallbackHandler()]
chain = prompt | llm | StrOutputParser() result = chain.invoke( {"topic": "AI"}, config={"callbacks": callbacks} )
OpenAI cost tracking
with get_openai_callback() as cb: result = chain.invoke({"topic": "AI"}) print(f"Total Tokens: {cb.total_tokens}") print(f"Total Cost: ${cb.total_cost}")
Custom Callbacks
Create custom callback handlers:
from langchain_core.callbacks import BaseCallbackHandler from typing import Any, Dict
class MyCustomCallback(BaseCallbackHandler): def on_llm_start(self, serialized: Dict[str, Any], prompts: list[str], **kwargs): print(f"LLM started with prompts: {prompts}")
def on_llm_end(self, response, **kwargs):
print(f"LLM finished with response: {response}")
def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
print(f"Chain started with inputs: {inputs}")
def on_chain_end(self, outputs: Dict[str, Any], **kwargs):
print(f"Chain ended with outputs: {outputs}")
def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs):
print(f"Tool started with input: {input_str}")
def on_tool_end(self, output: str, **kwargs):
print(f"Tool ended with output: {output}")
Use custom callback
custom_callback = MyCustomCallback() result = chain.invoke( {"topic": "AI"}, config={"callbacks": [custom_callback]} )
Argilla Callback
Track and log to Argilla:
from langchain_community.callbacks import ArgillaCallbackHandler
argilla_callback = ArgillaCallbackHandler( dataset_name="langchain-dataset", api_url="http://localhost:6900", api_key="your-api-key" )
callbacks = [argilla_callback]
agent = initialize_agent( tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, callbacks=callbacks )
agent.run("Who was the first president of the United States?")
UpTrain Callback
RAG evaluation and monitoring:
from langchain_community.callbacks import UpTrainCallbackHandler
uptrain_callback = UpTrainCallbackHandler( key_type="uptrain", api_key="your-api-key" )
config = {"callbacks": [uptrain_callback]}
Automatically evaluates context relevance, factual accuracy, completeness
result = rag_chain.invoke("What is LangChain?", config=config)
LangSmith Integration
Production monitoring and debugging:
import os
Set environment variables
os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key" os.environ["LANGCHAIN_PROJECT"] = "my-project"
All chains automatically traced
result = chain.invoke({"topic": "AI"})
View traces at smith.langchain.com
Retrieval Strategies
Vector Store Retrievers
Basic similarity search:
from langchain_community.vectorstores import FAISS, Chroma, Pinecone
FAISS
faiss_retriever = vectorstore.as_retriever( search_type="similarity", search_kwargs={"k": 5} )
Maximum Marginal Relevance (MMR)
mmr_retriever = vectorstore.as_retriever( search_type="mmr", search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.5} )
Similarity with threshold
threshold_retriever = vectorstore.as_retriever( search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.8, "k": 5} )
Ensemble Retriever
Combine multiple retrievers:
from langchain.retrievers import EnsembleRetriever from langchain_community.retrievers import BM25Retriever
BM25 for keyword search
bm25_retriever = BM25Retriever.from_texts(texts) bm25_retriever.k = 5
Combine with vector search
ensemble_retriever = EnsembleRetriever( retrievers=[bm25_retriever, faiss_retriever], weights=[0.5, 0.5] )
docs = ensemble_retriever.get_relevant_documents("LangChain features")
Time-Weighted Retriever
Prioritize recent documents:
from langchain.retrievers import TimeWeightedVectorStoreRetriever
retriever = TimeWeightedVectorStoreRetriever( vectorstore=vectorstore, decay_rate=0.01, # Decay factor for older docs k=5 )
Multi-Vector Retriever
Multiple vectors per document:
from langchain.retrievers.multi_vector import MultiVectorRetriever from langchain.storage import InMemoryByteStore
store = InMemoryByteStore()
retriever = MultiVectorRetriever( vectorstore=vectorstore, byte_store=store, id_key="doc_id" )
Add documents with multiple representations
retriever.add_documents(documents)
Streaming
Stream Chain Output
Stream tokens as they're generated:
from langchain_core.output_parsers import StrOutputParser
chain = prompt | llm | StrOutputParser()
Stream method
for chunk in chain.stream({"topic": "AI"}): print(chunk, end="", flush=True)
Stream with Callbacks
Handle streaming events:
from langchain_core.callbacks import StreamingStdOutCallbackHandler
streaming_llm = ChatOpenAI( streaming=True, callbacks=[StreamingStdOutCallbackHandler()] )
chain = prompt | streaming_llm | StrOutputParser() result = chain.invoke({"topic": "AI"}) # Streams to stdout
Async Streaming
Stream asynchronously:
async def stream_async(): async for chunk in chain.astream({"topic": "AI"}): print(chunk, end="", flush=True)
Run async
import asyncio asyncio.run(stream_async())
Stream Agent Responses
Stream agent execution:
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(llm, tools)
for chunk in agent.stream( {"messages": [("user", "Search for LangChain information")]}, stream_mode="values" ): chunk["messages"][-1].pretty_print()
Streaming RAG
Stream RAG responses:
retrieval_chain = ( { "context": retriever.with_config(run_name="Docs"), "question": RunnablePassthrough(), } | prompt | llm | StrOutputParser() )
Stream the response
for chunk in retrieval_chain.stream("What is LangChain?"): print(chunk, end="", flush=True)
Error Handling
Retry Logic
Automatic retries on failure:
from langchain_core.runnables import RunnableRetry
Add retry to chain
chain_with_retry = (prompt | llm | StrOutputParser()).with_retry( stop_after_attempt=3, wait_exponential_jitter=True )
result = chain_with_retry.invoke({"topic": "AI"})
Fallback Chains
Use fallback on errors:
from langchain_core.runnables import RunnableWithFallbacks
primary_llm = ChatOpenAI(model="gpt-4") fallback_llm = ChatOpenAI(model="gpt-3.5-turbo")
chain_with_fallback = (prompt | primary_llm).with_fallbacks( [prompt | fallback_llm] )
result = chain_with_fallback.invoke({"topic": "AI"})
Try-Except Patterns
Manual error handling:
from langchain_core.exceptions import OutputParserException
try: result = chain.invoke({"topic": "AI"}) except OutputParserException as e: print(f"Parsing failed: {e}") result = chain.invoke({"topic": "AI"}) # Retry except Exception as e: print(f"Chain execution failed: {e}") result = None
Timeout Handling
Set execution timeouts:
from langchain_core.runnables import RunnableConfig
config = RunnableConfig(timeout=10.0) # 10 seconds
try: result = chain.invoke({"topic": "AI"}, config=config) except TimeoutError: print("Chain execution timed out")
Validation
Validate inputs and outputs:
from pydantic import BaseModel, Field, validator
class QueryInput(BaseModel): topic: str = Field(..., min_length=1, max_length=100)
@validator("topic")
def topic_must_be_valid(cls, v):
if not v.strip():
raise ValueError("Topic cannot be empty")
return v.strip()
Use with chain
def validate_and_invoke(topic: str): try: validated = QueryInput(topic=topic) return chain.invoke({"topic": validated.topic}) except ValueError as e: return f"Validation error: {e}"
Production Best Practices
Environment Configuration
Manage secrets securely:
import os from dotenv import load_dotenv
load_dotenv()
Use environment variables
llm = ChatOpenAI( api_key=os.getenv("OPENAI_API_KEY"), model=os.getenv("MODEL_NAME", "gpt-4o-mini") )
Vector store configuration
VECTOR_STORE_TYPE = os.getenv("VECTOR_STORE", "faiss") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")
Caching
Cache LLM responses:
from langchain.cache import InMemoryCache, SQLiteCache from langchain.globals import set_llm_cache
In-memory cache
set_llm_cache(InMemoryCache())
Persistent cache
set_llm_cache(SQLiteCache(database_path=".langchain.db"))
Responses are cached automatically
result1 = llm.invoke("What is AI?") # Calls API result2 = llm.invoke("What is AI?") # Uses cache
Rate Limiting
Control API usage:
from langchain_core.rate_limiters import InMemoryRateLimiter
rate_limiter = InMemoryRateLimiter( requests_per_second=1, check_every_n_seconds=0.1, max_bucket_size=10 )
llm = ChatOpenAI(rate_limiter=rate_limiter)
Batch Processing
Process multiple inputs efficiently:
Batch invoke
inputs = [{"topic": f"Topic {i}"} for i in range(10)] results = chain.batch(inputs, config={"max_concurrency": 5})
Async batch
async def batch_process(): results = await chain.abatch(inputs) return results
Monitoring and Logging
Production monitoring:
import logging from langchain_core.callbacks import BaseCallbackHandler
Setup logging
logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)
class ProductionCallback(BaseCallbackHandler): def on_chain_start(self, serialized, inputs, **kwargs): logger.info(f"Chain started: {serialized.get('name', 'unknown')}")
def on_chain_end(self, outputs, **kwargs):
logger.info(f"Chain completed successfully")
def on_chain_error(self, error, **kwargs):
logger.error(f"Chain error: {error}")
Use in production
production_callback = ProductionCallback() config = {"callbacks": [production_callback]}
Testing Chains
Unit test your chains:
import pytest from langchain_core.messages import HumanMessage, AIMessage
def test_basic_chain(): chain = prompt | llm | StrOutputParser() result = chain.invoke({"topic": "testing"}) assert isinstance(result, str) assert len(result) > 0
def test_rag_chain(): result = rag_chain.invoke("What is LangChain?") assert "LangChain" in result assert len(result) > 50
@pytest.mark.asyncio async def test_async_chain(): result = await chain.ainvoke({"topic": "async"}) assert isinstance(result, str)
Performance Optimization
Optimize chain execution:
Use appropriate chunk sizes for text splitting
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=len )
Limit retrieval results
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
Use smaller, faster models where appropriate
fast_llm = ChatOpenAI(model="gpt-4o-mini")
Enable streaming for better UX
streaming_chain = prompt | fast_llm.with_streaming() | StrOutputParser()
Documentation
Document your chains:
from langchain_core.runnables import RunnableConfig
class DocumentedChain: """ Production RAG chain for technical documentation.
Features:
- Multi-query retrieval for better coverage
- Reranking for improved relevance
- Streaming support
- Error handling with fallbacks
Usage:
chain = DocumentedChain()
result = chain.invoke("Your question here")
"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o-mini")
self.retriever = self._setup_retriever()
self.chain = self._build_chain()
def _setup_retriever(self):
# Setup logic
pass
def _build_chain(self):
# Chain construction
pass
def invoke(self, query: str, config: RunnableConfig = None):
"""Execute the chain with error handling"""
try:
return self.chain.invoke(query, config=config)
except Exception as e:
logger.error(f"Chain execution failed: {e}")
raise
Summary
This skill covers comprehensive LangChain orchestration patterns:
-
Chains: Sequential, map-reduce, router, conditional chains
-
Agents: ReAct, conversational, zero-shot, structured agents
-
Memory: Buffer, window, summary, vector store memory
-
RAG: Basic, multi-query, reranking, parent document retrieval
-
LLM Integration: OpenAI, Anthropic, HuggingFace, Vertex AI, Ollama
-
Callbacks: Standard, custom, Argilla, UpTrain, LangSmith
-
Retrieval: Vector store, ensemble, time-weighted, multi-vector
-
Streaming: Chain, agent, async streaming
-
Error Handling: Retry, fallback, timeout, validation
-
Production: Configuration, caching, rate limiting, monitoring, testing
For more examples and patterns, see EXAMPLES.md.