ChatKit Python Backend Skill
Build custom chat API backends that work with OpenAI ChatKit frontend.
Architecture
┌─────────────────────────────────────────────────────────────────────────┐ │ Frontend (ChatKit-JS) │ │ useChatKit({ api: { url, domainKey } }) │ └─────────────────────────────────────────────────────────────────────────┘ │ │ HTTP POST /api/chat │ (SSE streaming response) ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ FastAPI Backend │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ /api/chat Endpoint │ │ │ │ • Parse ChatKit request │ │ │ │ • Run Agent with MCP tools │ │ │ │ • Stream response via SSE │ │ │ │ • Persist conversation to DB │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────┐ │ │ │ OpenAI Agents │ │ MCP Server │ │ Neon Database │ │ │ │ SDK + Gemini │ │ (Todo Tools) │ │ (Conversations) │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────┘
Quick Start
Installation
pip install fastapi uvicorn sse-starlette "openai-agents[litellm]"
Or with uv
uv add fastapi uvicorn sse-starlette "openai-agents[litellm]"
Environment Variables
Database
DATABASE_URL=postgresql://user:password@host/database
Gemini API
GOOGLE_API_KEY=your-gemini-api-key
CORS (frontend URL)
CORS_ORIGINS=http://localhost:3000,https://your-app.vercel.app
Reference
Pattern Guide
Chat Endpoint reference/chat-endpoint.md
SSE Streaming reference/sse-streaming.md
Conversation Persistence reference/conversation-persistence.md
Examples
Example Description
examples/chat-server.md Complete chat server implementation
Templates
Template Purpose
templates/chat_router.py FastAPI chat endpoint router
templates/models.py Database models for conversations
Basic Chat Endpoint
from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from sse_starlette.sse import EventSourceResponse from pydantic import BaseModel from typing import Optional, List, AsyncGenerator import json
app = FastAPI()
CORS for ChatKit frontend
app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:3000"], allow_credentials=True, allow_methods=[""], allow_headers=[""], )
class ChatMessage(BaseModel): role: str # "user" or "assistant" content: str
class ChatRequest(BaseModel): messages: List[ChatMessage] thread_id: Optional[str] = None user_id: Optional[str] = None
async def generate_response(messages: List[ChatMessage]) -> AsyncGenerator[str, None]: """Generate streaming response from agent.""" # Your agent logic here response = "Hello! How can I help you?"
# Stream response word by word
for word in response.split():
yield f"data: {json.dumps({'content': word + ' '})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
@app.post("/api/chat") async def chat_endpoint(request: ChatRequest): """Chat endpoint with SSE streaming.""" return EventSourceResponse( generate_response(request.messages), media_type="text/event-stream", )
Chat Endpoint with Agent
import os from fastapi import FastAPI, Request, HTTPException from sse_starlette.sse import EventSourceResponse from pydantic import BaseModel from typing import Optional, List, AsyncGenerator import json
from agents import Agent, Runner, set_tracing_disabled from agents.mcp import MCPServerStreamableHttp from agents.extensions.models.litellm_model import LitellmModel
set_tracing_disabled(disabled=True)
app = FastAPI()
class ChatMessage(BaseModel): role: str content: str
class ChatRequest(BaseModel): messages: List[ChatMessage] thread_id: Optional[str] = None user_id: str
Global MCP server connection (managed via lifespan)
mcp_server = None
@app.on_event("startup") async def startup(): global mcp_server mcp_server = MCPServerStreamableHttp( name="Todo MCP", params={"url": "http://localhost:8000/api/mcp", "timeout": 30}, cache_tools_list=True, ) await mcp_server.aenter()
@app.on_event("shutdown") async def shutdown(): global mcp_server if mcp_server: await mcp_server.aexit(None, None, None)
def create_agent(user_id: str): """Create agent with user context.""" return Agent( name="Todo Assistant", instructions=f"""You are a task management assistant for user {user_id}. Use the MCP tools to help manage tasks:
- add_task: Create new tasks
- list_tasks: View tasks
- complete_task: Mark done
- delete_task: Remove tasks Always pass user_id="{user_id}" to tools.""", model=LitellmModel( model="gemini/gemini-2.0-flash", api_key=os.getenv("GOOGLE_API_KEY"), ), mcp_servers=[mcp_server] if mcp_server else [], )
async def stream_agent_response( agent: Agent, user_message: str, ) -> AsyncGenerator[str, None]: """Stream agent response as SSE events.""" try: result = Runner.run_streamed(agent, user_message)
async for event in result.stream_events():
if hasattr(event, 'item') and event.item:
yield f"data: {json.dumps({'content': str(event.item)})}\n\n"
# Final response
final = result.final_output
yield f"data: {json.dumps({'content': final, 'done': True})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
@app.post("/api/chat") async def chat(request: ChatRequest): """Chat endpoint with streaming response.""" if not request.messages: raise HTTPException(400, "No messages provided")
# Get last user message
user_message = request.messages[-1].content
# Create agent for this user
agent = create_agent(request.user_id)
return EventSourceResponse(
stream_agent_response(agent, user_message),
media_type="text/event-stream",
)
SSE Event Format
ChatKit expects Server-Sent Events in this format:
Content chunk
yield f"data: {json.dumps({'content': 'partial response'})}\n\n"
Done signal
yield f"data: {json.dumps({'done': True})}\n\n"
Error
yield f"data: {json.dumps({'error': 'error message'})}\n\n"
Tool call (optional)
yield f"data: {json.dumps({'tool_call': {'name': 'add_task', 'result': {...}}})}\n\n"
Conversation Persistence
from sqlmodel import SQLModel, Field, Session from datetime import datetime from typing import Optional
class Conversation(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) thread_id: str = Field(index=True, unique=True) user_id: str = Field(index=True) created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow)
class Message(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) thread_id: str = Field(index=True) role: str # "user" or "assistant" content: str created_at: datetime = Field(default_factory=datetime.utcnow)
async def save_message(thread_id: str, role: str, content: str): """Save message to database.""" with Session(engine) as session: message = Message(thread_id=thread_id, role=role, content=content) session.add(message) session.commit()
async def load_conversation(thread_id: str) -> List[dict]: """Load conversation history.""" with Session(engine) as session: messages = session.exec( select(Message) .where(Message.thread_id == thread_id) .order_by(Message.created_at) ).all() return [{"role": m.role, "content": m.content} for m in messages]
Authentication Integration
from fastapi import Depends, HTTPException, Header
async def get_current_user(authorization: str = Header(...)) -> dict: """Verify JWT and extract user.""" if not authorization.startswith("Bearer "): raise HTTPException(401, "Invalid authorization header")
token = authorization[7:]
user = await verify_jwt_token(token) # Your JWT verification
if not user:
raise HTTPException(401, "Invalid token")
return user
@app.post("/api/chat") async def chat( request: ChatRequest, user: dict = Depends(get_current_user), ): """Authenticated chat endpoint.""" # Use user["id"] as user_id agent = create_agent(user["id"]) # ...
CORS Configuration
from fastapi.middleware.cors import CORSMiddleware
Get from environment
CORS_ORIGINS = os.getenv("CORS_ORIGINS", "http://localhost:3000").split(",")
app.add_middleware( CORSMiddleware, allow_origins=CORS_ORIGINS, allow_credentials=True, allow_methods=[""], allow_headers=[""], )
Error Handling
@app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): return JSONResponse( status_code=500, content={"error": str(exc)}, )
@app.post("/api/chat") async def chat(request: ChatRequest): try: # ... chat logic pass except ConnectionError: raise HTTPException(503, "MCP server unavailable") except TimeoutError: raise HTTPException(504, "Request timed out") except Exception as e: raise HTTPException(500, f"Internal error: {e}")
Testing
import pytest from httpx import AsyncClient
@pytest.mark.asyncio async def test_chat_endpoint(): async with AsyncClient(app=app, base_url="http://test") as client: response = await client.post( "/api/chat", json={ "messages": [{"role": "user", "content": "Hello"}], "user_id": "test-user", }, ) assert response.status_code == 200 assert "text/event-stream" in response.headers["content-type"]
Best Practices
-
Stream responses - Better UX with SSE streaming
-
Persist conversations - Enable conversation continuity
-
Authenticate requests - Verify user before processing
-
Handle errors gracefully - Return structured error responses
-
Configure CORS - Allow frontend domain
-
Use connection pooling - For database and MCP connections
-
Log conversations - For debugging and analytics