Python asyncio - Async/Await Concurrency
Overview
Python's asyncio library enables writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like HTTP requests, database queries, file operations, and WebSocket connections. asyncio provides non-blocking execution without the complexity of threading or multiprocessing.
Key Features:
-
async/await syntax for readable concurrent code
-
Event loop for managing concurrent operations
-
Tasks for running multiple coroutines concurrently
-
Primitives: locks, semaphores, events, queues
-
HTTP client/server with aiohttp
-
Database async support (asyncpg, aiomysql, motor)
-
FastAPI async endpoints
-
WebSocket support
-
Background task management
Installation:
asyncio is built-in (Python 3.7+)
Async HTTP client
pip install aiohttp
Async HTTP requests (alternative)
pip install httpx
Async database drivers
pip install asyncpg aiomysql motor # PostgreSQL, MySQL, MongoDB
FastAPI with async support
pip install fastapi uvicorn[standard]
Async testing
pip install pytest-asyncio
Basic Async/Await Patterns
- Simple Async Function
import asyncio
async def hello(): """Basic async function (coroutine).""" print("Hello") await asyncio.sleep(1) # Async sleep (non-blocking) print("World") return "Done"
Run async function
result = asyncio.run(hello()) print(result) # "Done"
Key Points:
-
async def defines a coroutine function
-
await suspends execution until awaitable completes
-
asyncio.run() is the entry point for async programs
-
Coroutines must be awaited or scheduled
- Multiple Concurrent Tasks
import asyncio import time
async def task(name, duration): """Simulate async task.""" print(f"{name}: Starting (duration: {duration}s)") await asyncio.sleep(duration) print(f"{name}: Complete") return f"{name} result"
async def run_concurrent(): """Run multiple tasks concurrently.""" start = time.time()
# Sequential (slow) - 6 seconds total
# result1 = await task("Task 1", 3)
# result2 = await task("Task 2", 2)
# result3 = await task("Task 3", 1)
# Concurrent (fast) - 3 seconds total
results = await asyncio.gather(
task("Task 1", 3),
task("Task 2", 2),
task("Task 3", 1)
)
elapsed = time.time() - start
print(f"Total time: {elapsed:.2f}s")
print(f"Results: {results}")
asyncio.run(run_concurrent())
Output: Total time: 3.00s (tasks ran concurrently)
- Task Creation and Management
import asyncio
async def background_task(name): """Long-running background task.""" for i in range(5): print(f"{name}: iteration {i}") await asyncio.sleep(1) return f"{name} complete"
async def main(): # Create task (starts immediately) task1 = asyncio.create_task(background_task("Task-1")) task2 = asyncio.create_task(background_task("Task-2"))
# Do other work while tasks run
print("Main: doing other work")
await asyncio.sleep(2)
# Wait for tasks to complete
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())
- Error Handling in Async Code
import asyncio
async def risky_operation(fail=False): """Operation that might fail.""" await asyncio.sleep(1) if fail: raise ValueError("Operation failed") return "Success"
async def handle_errors(): # Individual try/except try: result = await risky_operation(fail=True) except ValueError as e: print(f"Caught error: {e}") result = "Fallback value"
# Gather with error handling
results = await asyncio.gather(
risky_operation(fail=False),
risky_operation(fail=True),
risky_operation(fail=False),
return_exceptions=True # Return exceptions instead of raising
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(handle_errors())
Event Loop Fundamentals
- Event Loop Lifecycle
import asyncio
Modern approach (Python 3.7+)
async def main(): print("Main coroutine") await asyncio.sleep(1)
asyncio.run(main()) # Creates loop, runs main, closes loop
Manual loop management (advanced use cases)
async def manual_example(): loop = asyncio.get_event_loop()
# Schedule coroutine
task = loop.create_task(some_coroutine())
# Schedule callback
loop.call_later(5, callback_function)
# Run until complete
result = await task
return result
Get current event loop
async def get_current_loop(): loop = asyncio.get_running_loop() print(f"Loop: {loop}")
# Schedule callback in event loop
loop.call_soon(lambda: print("Callback executed"))
await asyncio.sleep(0) # Let callback execute
2. Loop Scheduling and Callbacks
import asyncio from datetime import datetime
def callback(name, loop): """Callback function (not async).""" print(f"{datetime.now()}: {name} callback executed")
# Stop loop after callback
# loop.stop()
async def schedule_callbacks(): loop = asyncio.get_running_loop()
# Schedule immediate callback
loop.call_soon(callback, "Immediate", loop)
# Schedule callback after delay
loop.call_later(2, callback, "Delayed 2s", loop)
# Schedule callback at specific time
loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)
# Wait for callbacks to execute
await asyncio.sleep(5)
asyncio.run(schedule_callbacks())
- Running Blocking Code
import asyncio import time
def blocking_io(): """CPU-intensive or blocking I/O operation.""" print("Blocking operation started") time.sleep(2) # Blocks thread print("Blocking operation complete") return "Blocking result"
async def run_in_executor(): """Run blocking code in thread pool.""" loop = asyncio.get_running_loop()
# Run in default executor (thread pool)
result = await loop.run_in_executor(
None, # Use default executor
blocking_io
)
print(f"Result: {result}")
Run blocking operations concurrently
async def concurrent_blocking(): loop = asyncio.get_running_loop()
# These run in thread pool, don't block event loop
results = await asyncio.gather(
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io)
)
print(f"All results: {results}")
asyncio.run(concurrent_blocking())
Asyncio Primitives
- Locks for Mutual Exclusion
import asyncio
Shared resource
counter = 0 lock = asyncio.Lock()
async def increment_with_lock(name): """Increment counter with lock protection.""" global counter
async with lock:
# Critical section - only one task at a time
print(f"{name}: acquired lock")
current = counter
await asyncio.sleep(0.1) # Simulate processing
counter = current + 1
print(f"{name}: released lock, counter={counter}")
async def increment_without_lock(name): """Increment without lock - race condition!""" global counter
current = counter
await asyncio.sleep(0.1) # Race condition window
counter = current + 1
print(f"{name}: counter={counter}")
async def test_locks(): global counter
# Without lock (race condition)
counter = 0
await asyncio.gather(
increment_without_lock("Task-1"),
increment_without_lock("Task-2"),
increment_without_lock("Task-3")
)
print(f"Without lock: {counter}") # Often wrong (< 3)
# With lock (correct)
counter = 0
await asyncio.gather(
increment_with_lock("Task-1"),
increment_with_lock("Task-2"),
increment_with_lock("Task-3")
)
print(f"With lock: {counter}") # Always 3
asyncio.run(test_locks())
- Semaphores for Resource Limiting
import asyncio
Limit concurrent operations
semaphore = asyncio.Semaphore(2) # Max 2 concurrent
async def limited_operation(name): """Operation limited by semaphore.""" print(f"{name}: waiting for semaphore")
async with semaphore:
print(f"{name}: acquired semaphore")
await asyncio.sleep(2) # Simulate work
print(f"{name}: releasing semaphore")
async def test_semaphore(): # Create 5 tasks, but only 2 run concurrently await asyncio.gather( limited_operation("Task-1"), limited_operation("Task-2"), limited_operation("Task-3"), limited_operation("Task-4"), limited_operation("Task-5") )
asyncio.run(test_semaphore())
Only 2 tasks hold semaphore at any time
- Events for Signaling
import asyncio
event = asyncio.Event()
async def waiter(name): """Wait for event to be set.""" print(f"{name}: waiting for event") await event.wait() # Block until event is set print(f"{name}: event received!")
async def setter(): """Set event after delay.""" await asyncio.sleep(2) print("Setter: setting event") event.set() # Wake up all waiters
async def test_event(): # Create waiters await asyncio.gather( waiter("Waiter-1"), waiter("Waiter-2"), waiter("Waiter-3"), setter() )
asyncio.run(test_event())
- Queues for Task Distribution
import asyncio import random
async def producer(queue, name): """Produce items and add to queue.""" for i in range(5): item = f"{name}-item-{i}" await queue.put(item) print(f"{name}: produced {item}") await asyncio.sleep(random.uniform(0.1, 0.5))
# Signal completion
await queue.put(None)
async def consumer(queue, name): """Consume items from queue.""" while True: item = await queue.get() # Block until item available
if item is None: # Shutdown signal
queue.task_done()
break
print(f"{name}: consumed {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def test_queue(): queue = asyncio.Queue(maxsize=10)
# Create producers and consumers
await asyncio.gather(
producer(queue, "Producer-1"),
producer(queue, "Producer-2"),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
consumer(queue, "Consumer-3")
)
# Wait for all items to be processed
await queue.join()
print("All tasks complete")
asyncio.run(test_queue())
- Condition Variables
import asyncio
condition = asyncio.Condition() items = []
async def consumer(name): """Wait for items to be available.""" async with condition: # Wait until items are available await condition.wait_for(lambda: len(items) > 0)
item = items.pop(0)
print(f"{name}: consumed {item}")
async def producer(name): """Add items and notify consumers.""" async with condition: item = f"{name}-item" items.append(item) print(f"{name}: produced {item}")
# Notify one waiting consumer
condition.notify(n=1)
# Or notify all: condition.notify_all()
async def test_condition(): await asyncio.gather( consumer("Consumer-1"), consumer("Consumer-2"), producer("Producer-1"), producer("Producer-2") )
asyncio.run(test_condition())
Async HTTP with aiohttp
- Basic HTTP Client
import asyncio import aiohttp
async def fetch_url(session, url): """Fetch single URL.""" async with session.get(url) as response: status = response.status text = await response.text() return {"url": url, "status": status, "length": len(text)}
async def fetch_multiple_urls(): """Fetch multiple URLs concurrently.""" urls = [ "https://httpbin.org/delay/1", "https://httpbin.org/delay/2", "https://httpbin.org/json", ]
async with aiohttp.ClientSession() as session:
# Concurrent requests
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
asyncio.run(fetch_multiple_urls())
- HTTP Client with Error Handling
import asyncio import aiohttp from typing import Dict, Any
async def fetch_with_retry( session: aiohttp.ClientSession, url: str, max_retries: int = 3 ) -> Dict[str, Any]: """Fetch URL with retry logic.""" for attempt in range(max_retries): try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response: response.raise_for_status() # Raise for 4xx/5xx data = await response.json() return {"success": True, "data": data}
except aiohttp.ClientError as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
return {"success": False, "error": str(e)}
# Exponential backoff
await asyncio.sleep(2 ** attempt)
except asyncio.TimeoutError:
print(f"Attempt {attempt + 1} timed out")
if attempt == max_retries - 1:
return {"success": False, "error": "Timeout"}
await asyncio.sleep(2 ** attempt)
async def parallel_api_calls(): """Make parallel API calls with error handling.""" urls = [ "https://httpbin.org/json", "https://httpbin.org/status/500", # Will fail "https://httpbin.org/delay/1", ]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch_with_retry(session, url) for url in urls],
return_exceptions=True # Don't stop on errors
)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"{url}: Exception - {result}")
elif result["success"]:
print(f"{url}: Success")
else:
print(f"{url}: Failed - {result['error']}")
asyncio.run(parallel_api_calls())
- HTTP Server with aiohttp
from aiohttp import web import asyncio
async def handle_hello(request): """Simple GET handler.""" name = request.query.get("name", "World") return web.json_response({"message": f"Hello, {name}!"})
async def handle_post(request): """POST handler with JSON body.""" data = await request.json()
# Simulate async processing
await asyncio.sleep(1)
return web.json_response({
"received": data,
"status": "processed"
})
async def handle_stream(request): """Streaming response.""" response = web.StreamResponse() await response.prepare(request)
for i in range(10):
await response.write(f"Chunk {i}\n".encode())
await asyncio.sleep(0.5)
await response.write_eof()
return response
Create application
app = web.Application() app.router.add_get("/hello", handle_hello) app.router.add_post("/process", handle_post) app.router.add_get("/stream", handle_stream)
Run server
if name == "main": web.run_app(app, host="0.0.0.0", port=8080)
- WebSocket Client
import asyncio import aiohttp
async def websocket_client(): """Connect to WebSocket server.""" url = "wss://echo.websocket.org"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url) as ws:
# Send messages
await ws.send_str("Hello WebSocket")
await ws.send_json({"type": "greeting", "data": "test"})
# Receive messages
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(f"Received: {msg.data}")
if msg.data == "close":
await ws.close()
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"Error: {ws.exception()}")
break
asyncio.run(websocket_client())
Async Database Operations
- PostgreSQL with asyncpg
import asyncio import asyncpg
async def database_operations(): """Async PostgreSQL operations.""" # Create connection pool pool = await asyncpg.create_pool( host="localhost", database="mydb", user="user", password="password", min_size=5, max_size=20 )
try:
# Acquire connection from pool
async with pool.acquire() as conn:
# Execute query
rows = await conn.fetch(
"SELECT id, name, email FROM users WHERE active = $1",
True
)
for row in rows:
print(f"User: {row['name']} ({row['email']})")
# Insert data
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"Alice", "alice@example.com"
)
# Transaction
async with conn.transaction():
await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")
finally:
await pool.close()
asyncio.run(database_operations())
- MongoDB with motor
import asyncio from motor.motor_asyncio import AsyncIOMotorClient
async def mongodb_operations(): """Async MongoDB operations.""" # Create client client = AsyncIOMotorClient("mongodb://localhost:27017") db = client.mydb collection = db.users
try:
# Insert document
result = await collection.insert_one({
"name": "Alice",
"email": "alice@example.com",
"age": 30
})
print(f"Inserted ID: {result.inserted_id}")
# Find documents
cursor = collection.find({"age": {"$gte": 25}})
async for document in cursor:
print(f"User: {document['name']}")
# Update document
await collection.update_one(
{"name": "Alice"},
{"$set": {"age": 31}}
)
# Aggregation pipeline
pipeline = [
{"$match": {"age": {"$gte": 25}}},
{"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
]
async for result in collection.aggregate(pipeline):
print(f"Average age: {result['avg_age']}")
finally:
client.close()
asyncio.run(mongodb_operations())
- Connection Pool Pattern
import asyncio import asyncpg from typing import Optional
class DatabasePool: """Async database connection pool manager."""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
"""Create connection pool."""
self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)
async def close(self):
"""Close connection pool."""
if self.pool:
await self.pool.close()
async def execute(self, query: str, *args):
"""Execute query."""
async with self.pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args):
"""Fetch multiple rows."""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query: str, *args):
"""Fetch single row."""
async with self.pool.acquire() as conn:
return await conn.fetchrow(query, *args)
Usage
async def use_pool(): db = DatabasePool("postgresql://user:pass@localhost/mydb") await db.connect()
try:
# Execute operations
rows = await db.fetch("SELECT * FROM users")
for row in rows:
print(row)
finally:
await db.close()
asyncio.run(use_pool())
FastAPI Async Patterns
- Async Endpoints
from fastapi import FastAPI, HTTPException import asyncio import httpx
app = FastAPI()
@app.get("/") async def root(): """Simple async endpoint.""" return {"message": "Hello World"}
@app.get("/delay/{seconds}") async def delayed_response(seconds: int): """Endpoint with async delay.""" await asyncio.sleep(seconds) return {"message": f"Waited {seconds} seconds"}
@app.get("/fetch") async def fetch_external(): """Fetch data from external API.""" async with httpx.AsyncClient() as client: response = await client.get("https://httpbin.org/json") return response.json()
@app.get("/parallel") async def parallel_requests(): """Make parallel API calls.""" async with httpx.AsyncClient() as client: responses = await asyncio.gather( client.get("https://httpbin.org/delay/1"), client.get("https://httpbin.org/delay/2"), client.get("https://httpbin.org/json") )
return {
"results": [r.json() for r in responses]
}
2. Background Tasks
from fastapi import FastAPI, BackgroundTasks import asyncio
app = FastAPI()
async def send_email(email: str, message: str): """Simulate sending email.""" print(f"Sending email to {email}") await asyncio.sleep(5) # Simulate slow email service print(f"Email sent to {email}: {message}")
@app.post("/send-notification") async def send_notification( email: str, message: str, background_tasks: BackgroundTasks ): """Send notification in background.""" # Add task to background background_tasks.add_task(send_email, email, message)
# Return immediately
return {"status": "notification queued"}
Alternative: manual task creation
@app.post("/send-notification-manual") async def send_notification_manual(email: str, message: str): """Create background task manually.""" asyncio.create_task(send_email(email, message)) return {"status": "notification queued"}
- Async Dependencies
from fastapi import FastAPI, Depends import asyncpg
app = FastAPI()
Database pool (global)
db_pool = None
async def get_db(): """Dependency: database connection.""" async with db_pool.acquire() as conn: yield conn
@app.on_event("startup") async def startup(): """Initialize database pool on startup.""" global db_pool db_pool = await asyncpg.create_pool( "postgresql://user:pass@localhost/mydb" )
@app.on_event("shutdown") async def shutdown(): """Close database pool on shutdown.""" await db_pool.close()
@app.get("/users/{user_id}") async def get_user(user_id: int, conn=Depends(get_db)): """Get user with async database dependency.""" user = await conn.fetchrow( "SELECT * FROM users WHERE id = $1", user_id )
if not user:
raise HTTPException(status_code=404, detail="User not found")
return dict(user)
4. WebSocket Endpoints
from fastapi import FastAPI, WebSocket, WebSocketDisconnect from typing import List import asyncio
app = FastAPI()
Active connections
active_connections: List[WebSocket] = []
@app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint.""" await websocket.accept() active_connections.append(websocket)
try:
while True:
# Receive message
data = await websocket.receive_text()
# Broadcast to all connections
for connection in active_connections:
await connection.send_text(f"Broadcast: {data}")
except WebSocketDisconnect:
active_connections.remove(websocket)
print("Client disconnected")
Background task to send periodic updates
async def broadcast_updates(): """Send periodic updates to all clients.""" while True: await asyncio.sleep(5)
for connection in active_connections:
try:
await connection.send_text("Periodic update")
except:
active_connections.remove(connection)
@app.on_event("startup") async def startup(): """Start background update task.""" asyncio.create_task(broadcast_updates())
Common Patterns and Best Practices
- Timeout Handling
import asyncio
async def slow_operation(): """Slow operation.""" await asyncio.sleep(10) return "Result"
async def with_timeout(): """Run operation with timeout.""" try: result = await asyncio.wait_for(slow_operation(), timeout=5.0) print(f"Result: {result}")
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(with_timeout())
- Cancellation Handling
import asyncio
async def cancellable_task(): """Task that can be cancelled.""" try: for i in range(10): print(f"Working: {i}") await asyncio.sleep(1)
return "Complete"
except asyncio.CancelledError:
print("Task was cancelled")
# Cleanup
raise # Re-raise to propagate cancellation
async def cancel_example(): """Example of task cancellation.""" task = asyncio.create_task(cancellable_task())
# Let it run for a bit
await asyncio.sleep(3)
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Confirmed: task was cancelled")
asyncio.run(cancel_example())
- Resource Cleanup with Context Managers
import asyncio
class AsyncResource: """Async context manager for resource management."""
async def __aenter__(self):
"""Async setup."""
print("Acquiring resource")
await asyncio.sleep(1) # Simulate async setup
self.connection = "connected"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async cleanup."""
print("Releasing resource")
await asyncio.sleep(1) # Simulate async cleanup
self.connection = None
async def use_resource(): """Use async resource.""" async with AsyncResource() as resource: print(f"Using resource: {resource.connection}") await asyncio.sleep(1) # Resource automatically cleaned up
asyncio.run(use_resource())
- Debouncing and Throttling
import asyncio from datetime import datetime
class Debouncer: """Debounce async function calls."""
def __init__(self, delay: float):
self.delay = delay
self.task = None
async def call(self, func, *args, **kwargs):
"""Debounced function call."""
# Cancel previous task
if self.task:
self.task.cancel()
# Create new task
async def delayed_call():
await asyncio.sleep(self.delay)
await func(*args, **kwargs)
self.task = asyncio.create_task(delayed_call())
async def api_call(query: str): """Simulated API call.""" print(f"{datetime.now()}: API call with query: {query}")
async def debounce_example(): """Example of debouncing.""" debouncer = Debouncer(delay=1.0)
# Rapid calls - only last one executes
await debouncer.call(api_call, "query1")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query2")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query3")
# Wait for debounced call
await asyncio.sleep(2)
asyncio.run(debounce_example())
Output: Only "query3" API call executes
- Rate Limiting
import asyncio from datetime import datetime
class RateLimiter: """Limit rate of async operations."""
def __init__(self, max_calls: int, period: float):
self.max_calls = max_calls
self.period = period
self.semaphore = asyncio.Semaphore(max_calls)
self.calls = []
async def __aenter__(self):
"""Acquire rate limit slot."""
await self.semaphore.acquire()
now = asyncio.get_event_loop().time()
# Remove old calls outside period
self.calls = [t for t in self.calls if now - t < self.period]
if len(self.calls) >= self.max_calls:
# Wait until oldest call expires
sleep_time = self.period - (now - self.calls[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.calls.append(now)
return self
async def __aexit__(self, *args):
"""Release semaphore."""
self.semaphore.release()
async def rate_limited_operation(limiter, name): """Operation with rate limiting.""" async with limiter: print(f"{datetime.now()}: {name}") await asyncio.sleep(0.1)
async def rate_limit_example(): """Example of rate limiting.""" # Max 3 calls per 2 seconds limiter = RateLimiter(max_calls=3, period=2.0)
# Try to make 6 calls
await asyncio.gather(*[
rate_limited_operation(limiter, f"Call-{i}")
for i in range(6)
])
asyncio.run(rate_limit_example())
Debugging Async Code
- Enable Debug Mode
import asyncio import logging
Enable asyncio debug mode
asyncio.run(main(), debug=True)
Or set environment variable:
PYTHONASYNCIODEBUG=1 python script.py
Configure logging
logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(name)
async def debug_example(): logger.debug("Starting operation") await asyncio.sleep(1) logger.debug("Operation complete")
- Detect Blocking Code
import asyncio import time
async def problematic_code(): """Code with blocking operation.""" print("Starting")
# BAD: Blocking sleep
time.sleep(2) # This blocks the event loop!
print("Complete")
Run with debug mode to detect blocking
asyncio.run(problematic_code(), debug=True)
Warning: Executing <Task> took 2.001 seconds
- Track Pending Tasks
import asyncio
async def track_tasks(): """Track all pending tasks.""" # Get all tasks tasks = asyncio.all_tasks()
print(f"Total tasks: {len(tasks)}")
for task in tasks:
print(f" - {task.get_name()}: {task}")
# Check if task is done
if task.done():
try:
result = task.result()
print(f" Result: {result}")
except Exception as e:
print(f" Exception: {e}")
Create some tasks
async def main(): task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task") task2 = asyncio.create_task(track_tasks(), name="tracking")
await task2
task1.cancel()
asyncio.run(main())
Testing Async Code
- pytest-asyncio Setup
test_async.py
import pytest import asyncio
Mark test as async
@pytest.mark.asyncio async def test_async_function(): """Test async function.""" result = await some_async_function() assert result == "expected"
@pytest.mark.asyncio async def test_async_http(): """Test async HTTP client.""" async with aiohttp.ClientSession() as session: async with session.get("https://httpbin.org/json") as response: assert response.status == 200 data = await response.json() assert "slideshow" in data
Async fixture
@pytest.fixture async def async_client(): """Async test fixture.""" client = await create_async_client() yield client await client.close()
@pytest.mark.asyncio async def test_with_fixture(async_client): """Test using async fixture.""" result = await async_client.fetch_data() assert result is not None
- Mocking Async Functions
import pytest from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio async def test_with_mock(): """Test with async mock.""" # Create async mock mock_func = AsyncMock(return_value="mocked result")
result = await mock_func()
assert result == "mocked result"
mock_func.assert_called_once()
@pytest.mark.asyncio @patch("module.async_function", new_callable=AsyncMock) async def test_with_patch(mock_async): """Test with patched async function.""" mock_async.return_value = {"status": "success"}
result = await some_function_that_calls_async()
assert result["status"] == "success"
mock_async.assert_called_once()
Performance Optimization
- Use asyncio.gather() for Parallelism
import asyncio import time
async def slow_task(n): await asyncio.sleep(1) return n * 2
async def optimized(): """Parallel execution.""" start = time.time()
# Sequential (slow) - 5 seconds
# results = []
# for i in range(5):
# result = await slow_task(i)
# results.append(result)
# Parallel (fast) - 1 second
results = await asyncio.gather(*[slow_task(i) for i in range(5)])
elapsed = time.time() - start
print(f"Time: {elapsed:.2f}s, Results: {results}")
asyncio.run(optimized())
- Connection Pooling
import asyncio import aiohttp
BAD: Create new session for each request
async def bad_pattern(): for i in range(10): async with aiohttp.ClientSession() as session: async with session.get("https://httpbin.org/json") as response: await response.json()
GOOD: Reuse session with connection pool
async def good_pattern(): async with aiohttp.ClientSession() as session: tasks = [ session.get("https://httpbin.org/json") for i in range(10) ] responses = await asyncio.gather(*tasks) for response in responses: await response.json()
- Avoid Blocking Operations
import asyncio
BAD: Blocking I/O in async function
async def bad_file_read(): with open("large_file.txt") as f: # Blocks event loop! data = f.read() return data
GOOD: Use async file I/O or run in executor
async def good_file_read(): loop = asyncio.get_running_loop()
# Run blocking operation in thread pool
data = await loop.run_in_executor(
None,
lambda: open("large_file.txt").read()
)
return data
BETTER: Use aiofiles for async file I/O
import aiofiles
async def better_file_read(): async with aiofiles.open("large_file.txt") as f: data = await f.read() return data
Common Pitfalls
❌ Anti-Pattern 1: Not Awaiting Coroutines
WRONG
async def bad(): result = async_function() # Returns coroutine, doesn't execute! print(result) # Prints: <coroutine object>
CORRECT
async def good(): result = await async_function() # Actually executes print(result)
❌ Anti-Pattern 2: Blocking the Event Loop
WRONG
import time
async def bad(): time.sleep(5) # Blocks entire event loop!
CORRECT
async def good(): await asyncio.sleep(5) # Non-blocking
❌ Anti-Pattern 3: Not Handling Cancellation
WRONG
async def bad(): await asyncio.sleep(10) # No cleanup if cancelled
CORRECT
async def good(): try: await asyncio.sleep(10) except asyncio.CancelledError: # Cleanup resources await cleanup() raise # Re-raise to propagate
❌ Anti-Pattern 4: Creating Event Loop Incorrectly
WRONG (Python 3.7+)
loop = asyncio.get_event_loop() loop.run_until_complete(main())
CORRECT (Python 3.7+)
asyncio.run(main())
❌ Anti-Pattern 5: Not Closing Resources
WRONG
async def bad(): session = aiohttp.ClientSession() response = await session.get(url) # Session never closed - resource leak!
CORRECT
async def good(): async with aiohttp.ClientSession() as session: response = await session.get(url) # Session automatically closed
Best Practices
-
Use asyncio.run() for entry point (Python 3.7+)
-
Always await coroutines - don't forget await
-
Use async context managers for resource cleanup
-
Connection pooling for HTTP and database clients
-
Handle CancelledError for graceful shutdown
-
Use asyncio.gather() for parallel operations
-
Avoid blocking operations in async functions
-
Use timeouts to prevent hanging operations
-
Debug mode during development to catch issues
-
Test async code with pytest-asyncio
Quick Reference
Common Commands
Run async script
python script.py
Run with debug mode
PYTHONASYNCIODEBUG=1 python script.py
Run tests
pytest -v --asyncio-mode=auto
Install async dependencies
pip install aiohttp asyncpg motor pytest-asyncio
Essential Imports
import asyncio import aiohttp import asyncpg from typing import List, Dict, Any
Resources
-
Official Documentation: https://docs.python.org/3/library/asyncio.html
-
aiohttp: https://docs.aiohttp.org/
-
FastAPI Async: https://fastapi.tiangolo.com/async/
-
pytest-asyncio: https://pytest-asyncio.readthedocs.io/
Related Skills
When using asyncio, consider these complementary skills:
-
fastapi-local-dev: FastAPI async server patterns and production deployment
-
pytest: Testing async code with pytest-asyncio and fixtures
-
systematic-debugging: Debugging async race conditions and deadlocks
Quick FastAPI Async Patterns (Inlined for Standalone Use)
FastAPI async endpoint pattern
from fastapi import FastAPI, Depends from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker
app = FastAPI()
Async database setup
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db") AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db(): async with AsyncSessionLocal() as session: yield session
@app.get("/users/{user_id}") async def get_user(user_id: int, db: AsyncSession = Depends(get_db)): # Async database query result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") return user
Background tasks with asyncio
@app.post("/send-email") async def send_email_endpoint(email: EmailSchema): # Non-blocking background task asyncio.create_task(send_email_async(email)) return {"status": "email queued"}
Quick pytest-asyncio Patterns (Inlined for Standalone Use)
Testing async functions with pytest
import pytest import pytest_asyncio from httpx import AsyncClient
Async test fixture
@pytest_asyncio.fixture async def async_client(): async with AsyncClient(app=app, base_url="http://test") as client: yield client
Async test function
@pytest.mark.asyncio async def test_get_user(async_client): response = await async_client.get("/users/1") assert response.status_code == 200 assert response.json()["id"] == 1
Testing concurrent operations
@pytest.mark.asyncio async def test_concurrent_requests(): async with AsyncClient(app=app, base_url="http://test") as client: # Run 10 requests concurrently responses = await asyncio.gather( *[client.get(f"/users/{i}") for i in range(1, 11)] ) assert all(r.status_code == 200 for r in responses)
Mock async dependencies
@pytest_asyncio.fixture async def mock_db(): # Setup mock database db = AsyncMock() yield db # Cleanup
Quick Async Debugging Reference (Inlined for Standalone Use)
Common Async Pitfalls:
Blocking the Event Loop
❌ WRONG: Blocking call in async function
async def bad_function(): time.sleep(5) # Blocks entire event loop! return "done"
✅ CORRECT: Use asyncio.sleep
async def good_function(): await asyncio.sleep(5) # Releases event loop return "done"
Debugging Race Conditions
Add logging to track execution order
import logging logging.basicConfig(level=logging.DEBUG)
async def debug_task(name): logging.debug(f"{name}: Starting") await asyncio.sleep(1) logging.debug(f"{name}: Finished") return name
Run with detailed tracing
asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True)
Deadlock Detection
Use timeout to detect deadlocks
try: result = await asyncio.wait_for(some_async_function(), timeout=5.0) except asyncio.TimeoutError: logging.error("Deadlock detected: operation timed out") # Investigate what's blocking
Inspecting Running Tasks
Check all pending tasks
tasks = asyncio.all_tasks() for task in tasks: print(f"Task: {task.get_name()}, Done: {task.done()}") if not task.done(): print(f" Current coro: {task.get_coro()}")
[Full FastAPI, pytest, and debugging patterns available in respective skills if deployed together]
Python Version Compatibility: This skill covers asyncio in Python 3.7+ and reflects current best practices for async programming in 2025.