Python Async Patterns
Asyncio patterns for concurrent Python programming.
Core Concepts
import asyncio
Coroutine (must be awaited)
async def fetch(url: str) -> str: async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text()
Entry point
async def main(): result = await fetch("https://example.com") return result
asyncio.run(main())
Pattern 1: Concurrent with gather
async def fetch_all(urls: list[str]) -> list[str]: """Fetch multiple URLs concurrently.""" async with aiohttp.ClientSession() as session: tasks = [fetch_one(session, url) for url in urls] return await asyncio.gather(*tasks, return_exceptions=True)
Pattern 2: Bounded Concurrency
async def fetch_with_limit(urls: list[str], limit: int = 10): """Limit concurrent requests.""" semaphore = asyncio.Semaphore(limit)
async def bounded_fetch(url):
async with semaphore:
return await fetch_one(url)
return await asyncio.gather(*[bounded_fetch(url) for url in urls])
Pattern 3: TaskGroup (Python 3.11+)
async def process_items(items): """Structured concurrency with automatic cleanup.""" async with asyncio.TaskGroup() as tg: for item in items: tg.create_task(process_one(item)) # All tasks complete here, or exception raised
Pattern 4: Timeout
async def with_timeout(): try: async with asyncio.timeout(5.0): # Python 3.11+ result = await slow_operation() except asyncio.TimeoutError: result = None return result
Critical Warnings
WRONG - blocks event loop
async def bad(): time.sleep(5) # Never use time.sleep! requests.get(url) # Blocking I/O!
CORRECT
async def good(): await asyncio.sleep(5) async with aiohttp.ClientSession() as s: await s.get(url)
WRONG - orphaned task
async def bad(): asyncio.create_task(work()) # May be garbage collected!
CORRECT - keep reference
async def good(): task = asyncio.create_task(work()) await task
Quick Reference
Pattern Use Case
gather(*tasks)
Multiple independent operations
Semaphore(n)
Rate limiting, resource constraints
TaskGroup()
Structured concurrency (3.11+)
Queue()
Producer-consumer
timeout(s)
Timeout wrapper (3.11+)
Lock()
Shared mutable state
Async Context Manager
from contextlib import asynccontextmanager
@asynccontextmanager async def managed_connection(): conn = await create_connection() try: yield conn finally: await conn.close()
Additional Resources
For detailed patterns, load:
-
./references/concurrency-patterns.md
-
Queue, Lock, producer-consumer
-
./references/aiohttp-patterns.md
-
HTTP client/server patterns
-
./references/mixing-sync-async.md
-
run_in_executor, thread pools
-
./references/debugging-async.md
-
Debug mode, profiling, finding issues
-
./references/production-patterns.md
-
Graceful shutdown, health checks, signal handling
-
./references/error-handling.md
-
Retry with backoff, circuit breakers, partial failures
-
./references/performance.md
-
uvloop, connection pooling, buffer sizing
Scripts
- ./scripts/find-blocking-calls.sh
- Scan code for blocking calls in async functions
Assets
- ./assets/async-project-template.py
- Production-ready async app skeleton
See Also
Prerequisites:
- python-typing-patterns
- Type hints for async functions
Related Skills:
-
python-fastapi-patterns
-
Async web APIs
-
python-observability-patterns
-
Async logging and tracing
-
python-database-patterns
-
Async database access