Asyncio Concurrency Patterns
A comprehensive skill for mastering Python's asyncio library and concurrent programming patterns. This skill covers event loops, coroutines, tasks, futures, synchronization primitives, async context managers, and production-ready patterns for building high-performance asynchronous applications.
When to Use This Skill
Use this skill when:
-
Building I/O-bound applications that need to handle many concurrent operations
-
Creating web servers, API clients, or websocket applications
-
Implementing real-time systems with event-driven architecture
-
Optimizing application performance with concurrent request handling
-
Managing multiple async operations with proper coordination and error handling
-
Building background task processors or job queues
-
Implementing async database operations and connection pooling
-
Creating chat applications, real-time dashboards, or notification systems
-
Handling parallel HTTP requests efficiently
-
Managing websocket connections with multiple event sources
-
Building microservices with async communication patterns
-
Optimizing resource utilization in network applications
Core Concepts
What is Asyncio?
Asyncio is Python's built-in library for writing concurrent code using the async/await syntax. It provides:
-
Event Loop: The core of asyncio that schedules and runs asynchronous tasks
-
Coroutines: Functions defined with async def that can be paused and resumed
-
Tasks: Scheduled coroutines that run concurrently
-
Futures: Low-level objects representing results of async operations
-
Synchronization Primitives: Locks, semaphores, events for coordination
Event Loop Fundamentals
The event loop is the central execution mechanism in asyncio:
import asyncio
Get or create an event loop
loop = asyncio.get_event_loop()
Run a coroutine until complete
loop.run_until_complete(my_coroutine())
Modern approach (Python 3.7+)
asyncio.run(my_coroutine())
Key Event Loop Concepts:
-
Single-threaded concurrency: One thread, many tasks
-
Cooperative multitasking: Tasks yield control voluntarily
-
I/O multiplexing: Efficient handling of many I/O operations
-
Non-blocking operations: Don't wait for I/O, do other work
Coroutines vs Functions
Regular Function:
def fetch_data(): # Blocks until complete return requests.get('http://api.example.com')
Coroutine:
async def fetch_data(): # Yields control while waiting async with aiohttp.ClientSession() as session: async with session.get('http://api.example.com') as resp: return await resp.text()
Tasks and Futures
Tasks wrap coroutines and schedule them on the event loop:
Create a task
task = asyncio.create_task(my_coroutine())
Task runs in background
... do other work ...
Wait for result
result = await task
Futures represent eventual results:
Low-level future (rarely used directly)
future = asyncio.Future()
Set result
future.set_result(42)
Get result
result = await future
Async Context Managers
Manage resources with async setup/teardown:
class AsyncResource: async def aenter(self): # Async setup await self.connect() return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Async cleanup
await self.disconnect()
Usage
async with AsyncResource() as resource: await resource.do_work()
Concurrency Patterns
Pattern 1: Gather - Concurrent Execution
Run multiple coroutines concurrently and wait for all to complete:
import asyncio import aiohttp
async def fetch(session, url): async with session.get(url) as response: return await response.text()
async def main(): async with aiohttp.ClientSession() as session: # Run all fetches concurrently results = await asyncio.gather( fetch(session, 'http://python.org'), fetch(session, 'http://docs.python.org'), fetch(session, 'http://pypi.org') ) return results
Results is a list in the same order as inputs
results = asyncio.run(main())
When to use:
-
Need all results
-
Order matters
-
Want to fail fast on first exception (default)
-
Can handle partial results with return_exceptions=True
Pattern 2: Wait - Flexible Waiting
More control over how to wait for multiple tasks:
import asyncio
async def task_a(): await asyncio.sleep(2) return 'A'
async def task_b(): await asyncio.sleep(1) return 'B'
async def main(): tasks = [ asyncio.create_task(task_a()), asyncio.create_task(task_b()) ]
# Wait for first to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Get first result
first_result = done.pop().result()
# Cancel remaining
for task in pending:
task.cancel()
return first_result
result = asyncio.run(main()) # Returns 'B' after 1 second
Wait strategies:
-
FIRST_COMPLETED : Return when first task finishes
-
FIRST_EXCEPTION : Return when first task raises exception
-
ALL_COMPLETED : Wait for all tasks (default)
Pattern 3: Semaphore - Limit Concurrency
Control maximum number of concurrent operations:
import asyncio import aiohttp
async def fetch_with_limit(session, url, semaphore): async with semaphore: # Only N requests run concurrently async with session.get(url) as resp: return await resp.text()
async def main(): # Limit to 5 concurrent requests semaphore = asyncio.Semaphore(5)
urls = [f'http://api.example.com/item/{i}' for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(session, url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())
When to use:
-
Rate limiting API requests
-
Controlling database connection usage
-
Preventing resource exhaustion
-
Respecting external service limits
Pattern 4: Lock - Mutual Exclusion
Ensure only one coroutine accesses a resource at a time:
import asyncio
class SharedCounter: def init(self): self.value = 0 self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
# Critical section - only one coroutine at a time
current = self.value
await asyncio.sleep(0) # Simulate async work
self.value = current + 1
async def worker(counter): for _ in range(100): await counter.increment()
async def main(): counter = SharedCounter()
# Run 10 workers concurrently
await asyncio.gather(*[worker(counter) for _ in range(10)])
print(f"Final count: {counter.value}") # Always 1000
asyncio.run(main())
Pattern 5: Event - Signaling
Coordinate multiple coroutines with events:
import asyncio
async def waiter(event, name): print(f'{name} waiting for event') await event.wait() print(f'{name} received event')
async def setter(event): await asyncio.sleep(2) print('Setting event') event.set()
async def main(): event = asyncio.Event()
# Multiple waiters
await asyncio.gather(
waiter(event, 'Waiter 1'),
waiter(event, 'Waiter 2'),
waiter(event, 'Waiter 3'),
setter(event)
)
asyncio.run(main())
Pattern 6: Queue - Producer/Consumer
Coordinate work between producers and consumers:
import asyncio
async def producer(queue, n): for i in range(n): await asyncio.sleep(0.1) await queue.put(f'item-{i}') print(f'Produced item-{i}')
# Signal completion
await queue.put(None)
async def consumer(queue, name): while True: item = await queue.get()
if item is None:
# Propagate sentinel to other consumers
await queue.put(None)
break
print(f'{name} processing {item}')
await asyncio.sleep(0.2)
queue.task_done()
async def main(): queue = asyncio.Queue()
# Start producer and consumers
await asyncio.gather(
producer(queue, 10),
consumer(queue, 'Consumer-1'),
consumer(queue, 'Consumer-2'),
consumer(queue, 'Consumer-3')
)
asyncio.run(main())
Task Management
Creating Tasks
Basic Task Creation:
import asyncio
async def background_task(): await asyncio.sleep(10) return 'Done'
async def main(): # Create task - starts running immediately task = asyncio.create_task(background_task())
# Do other work while task runs
await asyncio.sleep(1)
# Wait for result
result = await task
return result
asyncio.run(main())
Named Tasks (Python 3.8+):
task = asyncio.create_task( background_task(), name='my-background-task' )
print(task.get_name()) # 'my-background-task'
Task Cancellation
Graceful Cancellation:
import asyncio
async def long_running_task(): try: while True: await asyncio.sleep(1) print('Working...') except asyncio.CancelledError: print('Task cancelled, cleaning up...') # Cleanup logic raise # Re-raise to mark as cancelled
async def main(): task = asyncio.create_task(long_running_task())
# Let it run for 3 seconds
await asyncio.sleep(3)
# Request cancellation
task.cancel()
try:
await task
except asyncio.CancelledError:
print('Task was cancelled')
asyncio.run(main())
Cancellation with Context Manager:
import asyncio from contextlib import suppress
async def run_with_timeout(): task = asyncio.create_task(long_running_task())
try:
# Wait with timeout
await asyncio.wait_for(task, timeout=5.0)
except asyncio.TimeoutError:
task.cancel()
with suppress(asyncio.CancelledError):
await task
Exception Handling in Tasks
Gather with Exception Handling:
import asyncio
async def failing_task(n): await asyncio.sleep(n) raise ValueError(f'Task {n} failed')
async def successful_task(n): await asyncio.sleep(n) return f'Task {n} succeeded'
async def main(): # return_exceptions=True: Returns exceptions instead of raising results = await asyncio.gather( successful_task(1), failing_task(2), successful_task(3), return_exceptions=True )
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f'Task {i} failed: {result}')
else:
print(f'Task {i} result: {result}')
asyncio.run(main())
Task Exception Retrieval:
import asyncio
async def main(): task = asyncio.create_task(failing_task(1))
# Wait for task
await asyncio.sleep(2)
# Check if task failed
if task.done() and task.exception():
print(f'Task failed with: {task.exception()}')
asyncio.run(main())
Event Loop Management
Event Loop Policies
Default Event Loop:
import asyncio
async def main(): # Get running loop loop = asyncio.get_running_loop() print(f'Loop: {loop}')
asyncio.run(main())
Custom Event Loop:
import asyncio
async def main(): pass
Create new event loop
loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)
try: loop.run_until_complete(main()) finally: loop.close()
Event Loop Best Practices:
-
Use asyncio.run() for simple programs (Python 3.7+)
-
Avoid creating ClientSession outside event loop
-
Always close loops when done
-
Don't call blocking functions in event loop
Running Blocking Code
Using ThreadPoolExecutor:
import asyncio import time from concurrent.futures import ThreadPoolExecutor
def blocking_io(): # Blocking operation time.sleep(2) return 'Done'
async def main(): loop = asyncio.get_running_loop()
# Run blocking code in thread pool
result = await loop.run_in_executor(
None, # Use default executor
blocking_io
)
return result
asyncio.run(main())
Custom Executor:
import asyncio from concurrent.futures import ThreadPoolExecutor
async def main(): loop = asyncio.get_running_loop()
# Custom executor with 4 threads
with ThreadPoolExecutor(max_workers=4) as executor:
results = await asyncio.gather(*[
loop.run_in_executor(executor, blocking_io)
for _ in range(10)
])
return results
asyncio.run(main())
Loop Callbacks
Schedule Callback:
import asyncio
def callback(arg): print(f'Callback called with {arg}')
async def main(): loop = asyncio.get_running_loop()
# Schedule callback
loop.call_soon(callback, 'immediate')
# Schedule with delay
loop.call_later(2, callback, 'delayed')
# Schedule at specific time
loop.call_at(loop.time() + 3, callback, 'scheduled')
await asyncio.sleep(4)
asyncio.run(main())
Async Context Managers
Creating Async Context Managers
Class-Based:
import asyncio
class AsyncDatabaseConnection: def init(self, host): self.host = host self.connection = None
async def __aenter__(self):
print(f'Connecting to {self.host}')
await asyncio.sleep(0.1) # Simulate connection
self.connection = f'Connection to {self.host}'
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f'Closing connection to {self.host}')
await asyncio.sleep(0.1) # Simulate cleanup
self.connection = None
async def query(self, sql):
if not self.connection:
raise RuntimeError('Not connected')
await asyncio.sleep(0.05)
return f'Results for: {sql}'
async def main(): async with AsyncDatabaseConnection('localhost') as db: result = await db.query('SELECT * FROM users') print(result)
asyncio.run(main())
Decorator-Based:
import asyncio from contextlib import asynccontextmanager
@asynccontextmanager async def async_resource(name): # Setup print(f'Acquiring {name}') await asyncio.sleep(0.1)
try:
yield name
finally:
# Cleanup
print(f'Releasing {name}')
await asyncio.sleep(0.1)
async def main(): async with async_resource('database') as db: print(f'Using {db}')
asyncio.run(main())
Real-World Example: aiohttp ClientSession
import aiohttp import asyncio
async def fetch(session, url): async with session.get(url) as response: return await response.text()
async def main(): # ClientSession as async context manager async with aiohttp.ClientSession() as session: html = await fetch(session, 'http://python.org') print(f'Body: {html[:100]}...')
asyncio.run(main())
Why use async context manager for ClientSession?
-
Ensures proper cleanup of connections
-
Prevents resource leaks
-
Manages SSL connections correctly
-
Handles graceful shutdown
Performance Optimization
Profiling Async Code
Basic Timing:
import asyncio import time
async def slow_operation(): await asyncio.sleep(1)
async def main(): start = time.perf_counter()
await slow_operation()
elapsed = time.perf_counter() - start
print(f'Took {elapsed:.2f} seconds')
asyncio.run(main())
Profiling Multiple Operations:
import asyncio import time
async def timed_task(name, duration): start = time.perf_counter() await asyncio.sleep(duration) elapsed = time.perf_counter() - start print(f'{name} took {elapsed:.2f}s') return name
async def main(): await asyncio.gather( timed_task('Task 1', 1), timed_task('Task 2', 2), timed_task('Task 3', 0.5) )
asyncio.run(main())
Optimizing Concurrency
Bad - Sequential Execution:
async def slow_approach(): results = [] for i in range(10): result = await fetch_data(i) results.append(result) return results
Takes 10 * fetch_time
Good - Concurrent Execution:
async def fast_approach(): tasks = [fetch_data(i) for i in range(10)] results = await asyncio.gather(*tasks) return results
Takes ~fetch_time
Better - Controlled Concurrency:
async def controlled_approach(): semaphore = asyncio.Semaphore(5) # Max 5 concurrent
async def fetch_with_limit(i):
async with semaphore:
return await fetch_data(i)
tasks = [fetch_with_limit(i) for i in range(10)]
results = await asyncio.gather(*tasks)
return results
Takes ~2 * fetch_time, but respects limits
Avoiding Common Performance Pitfalls
- Don't create sessions per request:
BAD - Creates new session each time
async def bad_fetch(url): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.text()
GOOD - Reuse session
async def good_fetch(): async with aiohttp.ClientSession() as session: results = await asyncio.gather( session.get('http://example.com/1'), session.get('http://example.com/2'), session.get('http://example.com/3') ) return results
- Don't use blocking operations:
import asyncio import requests # Blocking library
BAD - Blocks event loop
async def bad_request(): response = requests.get('http://example.com') # BLOCKS! return response.text
GOOD - Use async library
async def good_request(): async with aiohttp.ClientSession() as session: async with session.get('http://example.com') as resp: return await resp.text()
ACCEPTABLE - If must use blocking, use executor
async def acceptable_request(): loop = asyncio.get_running_loop() result = await loop.run_in_executor( None, lambda: requests.get('http://example.com').text ) return result
- Proper cleanup with zero-sleep:
async def proper_cleanup(): async with aiohttp.ClientSession() as session: async with session.get('http://example.org/') as resp: await resp.read()
# Zero-sleep to allow underlying connections to close
await asyncio.sleep(0)
Common Pitfalls
Pitfall 1: Creating ClientSession Outside Event Loop
Problem:
import aiohttp
BAD - Session created outside event loop
session = aiohttp.ClientSession()
async def fetch(url): async with session.get(url) as resp: return await resp.text()
Why it's bad:
-
Session binds to event loop at creation time
-
If loop changes (e.g., uvloop), session becomes invalid
-
Can cause program to hang
Solution:
import aiohttp import asyncio
async def main(): # Create session inside async function async with aiohttp.ClientSession() as session: async with session.get('http://python.org') as resp: print(await resp.text())
asyncio.run(main())
Pitfall 2: Session as Class Variable
Problem:
class API: session = aiohttp.ClientSession() # BAD - global instance
async def fetch(self, url):
async with self.session.get(url) as resp:
return await resp.text()
Solution:
class API: def init(self): self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
await self.session.close()
async def fetch(self, url):
async with self.session.get(url) as resp:
return await resp.text()
Usage
async def main(): async with API() as api: result = await api.fetch('http://example.com')
Pitfall 3: Forgetting await
Problem:
async def process_data(): # Forgot await - returns coroutine, doesn't execute! result = fetch_data() # Missing await return result
Solution:
async def process_data(): result = await fetch_data() # Proper await return result
Pitfall 4: Blocking the Event Loop
Problem:
import asyncio import time
async def bad_sleep(): time.sleep(5) # BAD - Blocks entire event loop!
async def main(): await asyncio.gather( bad_sleep(), another_task() # Blocked for 5 seconds )
Solution:
import asyncio
async def good_sleep(): await asyncio.sleep(5) # GOOD - Yields control
async def main(): await asyncio.gather( good_sleep(), another_task() # Runs concurrently )
Pitfall 5: Not Handling Task Cancellation
Problem:
async def bad_task(): while True: await asyncio.sleep(1) process_data() # No cleanup on cancellation!
Solution:
async def good_task(): try: while True: await asyncio.sleep(1) process_data() except asyncio.CancelledError: # Cleanup resources cleanup() raise # Re-raise to mark as cancelled
Pitfall 6: Deadlocks with Locks
Problem:
import asyncio
lock1 = asyncio.Lock() lock2 = asyncio.Lock()
async def task_a(): async with lock1: await asyncio.sleep(0.1) async with lock2: # Deadlock potential pass
async def task_b(): async with lock2: await asyncio.sleep(0.1) async with lock1: # Deadlock potential pass
Solution:
Always acquire locks in same order
async def safe_task_a(): async with lock1: async with lock2: pass
async def safe_task_b(): async with lock1: # Same order async with lock2: pass
Production Patterns
Pattern 1: Graceful Shutdown
Complete Shutdown Example:
import asyncio import signal from contextlib import suppress
class Application: def init(self): self.should_exit = False self.tasks = []
async def worker(self, name):
try:
while not self.should_exit:
print(f'{name} working...')
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f'{name} cancelled, cleaning up...')
raise
def handle_signal(self, sig):
print(f'Received signal {sig}, shutting down...')
self.should_exit = True
async def run(self):
# Setup signal handlers
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda s=sig: self.handle_signal(s)
)
# Start workers
self.tasks = [
asyncio.create_task(self.worker(f'Worker-{i}'))
for i in range(3)
]
# Wait for shutdown signal
while not self.should_exit:
await asyncio.sleep(0.1)
# Cancel all tasks
for task in self.tasks:
task.cancel()
# Wait for cancellation to complete
await asyncio.gather(*self.tasks, return_exceptions=True)
print('Shutdown complete')
Run application
app = Application() asyncio.run(app.run())
Pattern 2: Background Tasks with Application Lifecycle
aiohttp Application with Background Tasks:
import asyncio from contextlib import suppress from aiohttp import web
async def listen_to_redis(app): """Background task that listens to Redis""" # Simulated Redis listening try: while True: # Process messages await asyncio.sleep(1) print('Processing Redis message...') except asyncio.CancelledError: print('Redis listener stopped') raise
async def background_tasks(app): """Cleanup context for managing background tasks""" # Startup: Create background task app['redis_listener'] = asyncio.create_task(listen_to_redis(app))
yield # App is running
# Cleanup: Cancel background task
app['redis_listener'].cancel()
with suppress(asyncio.CancelledError):
await app['redis_listener']
Setup application
app = web.Application() app.cleanup_ctx.append(background_tasks)
Pattern 3: Retry Logic with Exponential Backoff
import asyncio import aiohttp from typing import Any, Callable
async def retry_with_backoff( coro_func: Callable, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, *args, **kwargs ) -> Any: """ Retry async function with exponential backoff
Args:
coro_func: Async function to retry
max_retries: Maximum number of retries
base_delay: Initial delay between retries
max_delay: Maximum delay between retries
"""
for attempt in range(max_retries):
try:
return await coro_func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
# Last attempt failed
raise
# Calculate delay with exponential backoff
delay = min(base_delay * (2 ** attempt), max_delay)
print(f'Attempt {attempt + 1} failed: {e}')
print(f'Retrying in {delay:.1f} seconds...')
await asyncio.sleep(delay)
Usage
async def unstable_api_call(): async with aiohttp.ClientSession() as session: async with session.get('http://unstable-api.com') as resp: return await resp.json()
async def main(): result = await retry_with_backoff( unstable_api_call, max_retries=5, base_delay=1.0 ) return result
Pattern 4: Circuit Breaker
import asyncio from datetime import datetime, timedelta from enum import Enum
class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing, reject requests HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker: def init( self, failure_threshold: int = 5, recovery_timeout: float = 60.0, success_threshold: int = 2 ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.state = CircuitState.CLOSED
self.opened_at = None
async def call(self, coro_func, *args, **kwargs):
if self.state == CircuitState.OPEN:
# Check if should try recovery
if datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout):
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise Exception('Circuit breaker is OPEN')
try:
result = await coro_func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
def _on_failure(self):
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()
Usage
async def flaky_service(): # Simulated flaky service import random await asyncio.sleep(0.1) if random.random() < 0.5: raise Exception('Service error') return 'Success'
async def main(): breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=5.0)
for i in range(20):
try:
result = await breaker.call(flaky_service)
print(f'Request {i}: {result} - State: {breaker.state.value}')
except Exception as e:
print(f'Request {i}: Failed - State: {breaker.state.value}')
await asyncio.sleep(0.5)
Pattern 5: WebSocket with Multiple Event Sources
Handling Parallel WebSocket and Background Events:
import asyncio from aiohttp import web
async def read_subscription(ws, redis): """Background task reading from Redis and sending to WebSocket""" # Simulated Redis subscription channel = await redis.subscribe('channel:1')
try:
# Simulate receiving messages
for i in range(10):
await asyncio.sleep(1)
message = f'Redis message {i}'
await ws.send_str(message)
finally:
await redis.unsubscribe('channel:1')
async def websocket_handler(request): """WebSocket handler with parallel event sources""" ws = web.WebSocketResponse() await ws.prepare(request)
# Create background task for Redis subscription
redis = request.app['redis']
task = asyncio.create_task(read_subscription(ws, redis))
try:
# Handle incoming WebSocket messages
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
# Process incoming message
await ws.send_str(f'Echo: {msg.data}')
elif msg.type == web.WSMsgType.ERROR:
print(f'WebSocket error: {ws.exception()}')
finally:
# Cleanup: Cancel background task
task.cancel()
return ws
Best Practices
Testing Async Code
Using pytest-asyncio:
import pytest import asyncio
@pytest.mark.asyncio async def test_async_function(): result = await async_operation() assert result == 'expected'
@pytest.mark.asyncio async def test_with_fixture(aiohttp_client): client = await aiohttp_client(create_app()) resp = await client.get('/') assert resp.status == 200
Manual Event Loop Setup:
import asyncio import unittest
class TestAsyncCode(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop)
def tearDown(self):
self.loop.close()
def test_coroutine(self):
async def test_impl():
result = await async_function()
self.assertEqual(result, 'expected')
self.loop.run_until_complete(test_impl())
Debugging Async Code
Enable Debug Mode:
import asyncio import warnings
Enable asyncio debug mode
asyncio.run(main(), debug=True)
Or manually
loop = asyncio.get_event_loop() loop.set_debug(True) loop.run_until_complete(main())
What debug mode detects:
-
Coroutines that were never awaited
-
Callbacks taking too long
-
Tasks destroyed while pending
Logging Slow Callbacks:
import asyncio import logging
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop() loop.slow_callback_duration = 0.1 # 100ms threshold loop.set_debug(True)
Documentation
Documenting Async Functions:
async def fetch_user_data(user_id: int) -> dict: """ Fetch user data from the database.
Args:
user_id: The unique identifier of the user
Returns:
Dictionary containing user data
Raises:
UserNotFoundError: If user doesn't exist
DatabaseError: If database connection fails
Example:
>>> async def main():
... user = await fetch_user_data(123)
... print(user['name'])
Note:
This function must be called within an async context.
Connection pooling is handled automatically.
"""
async with get_db_connection() as conn:
return await conn.fetch_one(
'SELECT * FROM users WHERE id = $1',
user_id
)
Complete Examples
Example 1: Parallel HTTP Requests
import asyncio import aiohttp import time
async def fetch(session, url): """Fetch a single URL""" async with session.get(url) as response: return { 'url': url, 'status': response.status, 'length': len(await response.text()) }
async def fetch_all(urls): """Fetch multiple URLs concurrently""" async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] results = await asyncio.gather(*tasks) return results
async def main(): urls = [ 'http://python.org', 'http://docs.python.org', 'http://pypi.org', 'http://github.com/python', 'http://www.python.org/dev/peps/' ]
start = time.perf_counter()
results = await fetch_all(urls)
elapsed = time.perf_counter() - start
for result in results:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
asyncio.run(main())
Example 2: Rate-Limited API Client
import asyncio import aiohttp from typing import List, Dict, Any
class RateLimitedClient: def init(self, rate_limit: int = 10): """ Args: rate_limit: Maximum concurrent requests """ self.semaphore = asyncio.Semaphore(rate_limit) self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
await self.session.close()
# Allow connections to close
await asyncio.sleep(0)
async def fetch(self, url: str) -> Dict[str, Any]:
"""Fetch URL with rate limiting"""
async with self.semaphore:
print(f'Fetching {url}')
async with self.session.get(url) as resp:
return {
'url': url,
'status': resp.status,
'data': await resp.json()
}
async def fetch_all(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Fetch all URLs with rate limiting"""
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def main(): urls = [f'https://api.github.com/users/{user}' for user in ['python', 'django', 'flask', 'requests', 'aiohttp']]
async with RateLimitedClient(rate_limit=2) as client:
results = await client.fetch_all(urls)
for result in results:
if isinstance(result, Exception):
print(f'Error: {result}')
else:
print(f"User: {result['data'].get('login', 'unknown')}")
asyncio.run(main())
Example 3: Database Connection Pool
import asyncio from typing import List, Any
class AsyncConnectionPool: def init(self, size: int = 10): self.pool = asyncio.Queue(maxsize=size) self.size = size
async def init(self):
"""Initialize connection pool"""
for i in range(self.size):
conn = await self._create_connection(i)
await self.pool.put(conn)
async def _create_connection(self, conn_id: int):
"""Create a database connection (simulated)"""
await asyncio.sleep(0.1) # Simulate connection time
return {'id': conn_id, 'connected': True}
async def acquire(self):
"""Acquire connection from pool"""
return await self.pool.get()
async def release(self, conn):
"""Release connection back to pool"""
await self.pool.put(conn)
async def execute(self, query: str) -> Any:
"""Execute query using pooled connection"""
conn = await self.acquire()
try:
# Simulate query execution
await asyncio.sleep(0.05)
return f"Query '{query}' executed on connection {conn['id']}"
finally:
await self.release(conn)
async def close(self):
"""Close all connections"""
while not self.pool.empty():
conn = await self.pool.get()
# Close connection (simulated)
conn['connected'] = False
async def worker(pool: AsyncConnectionPool, worker_id: int): """Worker that executes queries""" for i in range(5): result = await pool.execute(f'SELECT * FROM table WHERE id={i}') print(f'Worker {worker_id}: {result}')
async def main(): # Create and initialize pool pool = AsyncConnectionPool(size=5) await pool.init()
# Run multiple workers concurrently
await asyncio.gather(*[
worker(pool, i) for i in range(10)
])
# Cleanup
await pool.close()
asyncio.run(main())
Example 4: Real-Time Data Processor
import asyncio import random from datetime import datetime
class DataProcessor: def init(self): self.queue = asyncio.Queue() self.processed = 0 self.errors = 0
async def producer(self, producer_id: int):
"""Produce data items"""
for i in range(10):
await asyncio.sleep(random.uniform(0.1, 0.5))
item = {
'producer_id': producer_id,
'item_id': i,
'timestamp': datetime.now(),
'data': random.randint(1, 100)
}
await self.queue.put(item)
print(f'Producer {producer_id} generated item {i}')
# Signal completion
await self.queue.put(None)
async def consumer(self, consumer_id: int):
"""Consume and process data items"""
while True:
item = await self.queue.get()
if item is None:
# Propagate sentinel
await self.queue.put(None)
break
try:
# Simulate processing
await asyncio.sleep(random.uniform(0.05, 0.2))
# Process item
result = item['data'] * 2
print(f"Consumer {consumer_id} processed: {item['item_id']} -> {result}")
self.processed += 1
except Exception as e:
print(f'Consumer {consumer_id} error: {e}')
self.errors += 1
finally:
self.queue.task_done()
async def monitor(self):
"""Monitor processing statistics"""
while True:
await asyncio.sleep(2)
print(f'\n=== Stats: Processed={self.processed}, Errors={self.errors}, Queue={self.queue.qsize()} ===\n')
async def run(self, num_producers: int = 3, num_consumers: int = 5):
"""Run the data processor"""
# Start monitor
monitor_task = asyncio.create_task(self.monitor())
# Start producers and consumers
await asyncio.gather(
*[self.producer(i) for i in range(num_producers)],
*[self.consumer(i) for i in range(num_consumers)]
)
# Cancel monitor
monitor_task.cancel()
print(f'\nFinal Stats: Processed={self.processed}, Errors={self.errors}')
async def main(): processor = DataProcessor() await processor.run(num_producers=3, num_consumers=5)
asyncio.run(main())
Example 5: Async File I/O with aiofiles
import asyncio import aiofiles from pathlib import Path
async def write_file(path: str, content: str): """Write content to file asynchronously""" async with aiofiles.open(path, 'w') as f: await f.write(content)
async def read_file(path: str) -> str: """Read file content asynchronously""" async with aiofiles.open(path, 'r') as f: return await f.read()
async def process_files(file_paths: list): """Process multiple files concurrently""" tasks = [read_file(path) for path in file_paths] contents = await asyncio.gather(*tasks)
# Process contents
results = []
for path, content in zip(file_paths, contents):
result = {
'path': path,
'lines': len(content.split('\n')),
'words': len(content.split()),
'chars': len(content)
}
results.append(result)
return results
async def main(): # Create test files test_files = ['test1.txt', 'test2.txt', 'test3.txt']
# Write files concurrently
await asyncio.gather(*[
write_file(f, f'Content of file {f}\n' * 10)
for f in test_files
])
# Process files
results = await process_files(test_files)
for result in results:
print(f"{result['path']}: {result['lines']} lines, "
f"{result['words']} words, {result['chars']} chars")
# Cleanup
for f in test_files:
Path(f).unlink(missing_ok=True)
asyncio.run(main()) # Uncomment to run (requires aiofiles)
Resources
-
Python asyncio Documentation: https://docs.python.org/3/library/asyncio.html
-
aiohttp Documentation: https://docs.aiohttp.org/
-
Real Python asyncio Guide: https://realpython.com/async-io-python/
-
PEP 492 - Coroutines with async and await syntax: https://www.python.org/dev/peps/pep-0492/
-
asyncio Cheat Sheet: https://www.pythonsheets.com/notes/python-asyncio.html
-
Effective Python: Item 60 - Consider asyncio: https://effectivepython.com/
Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: Concurrency, Performance, Async Programming Compatible With: Python 3.7+, aiohttp, asyncio, uvloop