python-async-patterns

Python Async Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "python-async-patterns" with this command: npx skills add thebushidocollective/han/thebushidocollective-han-python-async-patterns

Python Async Patterns

Master asynchronous programming in Python using asyncio, async/await syntax, and concurrent execution patterns for I/O-bound and CPU-bound tasks.

Basic Async/Await

Core async syntax:

import asyncio

Define async function with async def

async def fetch_data(url: str) -> str: print(f"Fetching {url}...") await asyncio.sleep(1) # Simulate I/O operation return f"Data from {url}"

Call async function

async def main() -> None: result = await fetch_data("https://api.example.com") print(result)

Run async function

asyncio.run(main())

Multiple concurrent operations:

import asyncio

async def fetch_url(url: str) -> str: await asyncio.sleep(1) return f"Content from {url}"

async def main() -> None: # Run concurrently with gather results = await asyncio.gather( fetch_url("https://example.com/1"), fetch_url("https://example.com/2"), fetch_url("https://example.com/3") ) for result in results: print(result)

asyncio.run(main())

asyncio.create_task

Creating and managing tasks:

import asyncio

async def process_item(item: str, delay: float) -> str: await asyncio.sleep(delay) return f"Processed {item}"

async def main() -> None: # Create tasks for concurrent execution task1 = asyncio.create_task(process_item("A", 2.0)) task2 = asyncio.create_task(process_item("B", 1.0)) task3 = asyncio.create_task(process_item("C", 1.5))

# Do other work while tasks run
print("Tasks started")

# Wait for tasks to complete
result1 = await task1
result2 = await task2
result3 = await task3

print(result1, result2, result3)

asyncio.run(main())

Task with name and context:

import asyncio

async def background_task(name: str) -> None: print(f"Task {name} starting") await asyncio.sleep(2) print(f"Task {name} completed")

async def main() -> None: # Create named task task = asyncio.create_task( background_task("worker"), name="background-worker" )

# Check task status
print(f"Task name: {task.get_name()}")
print(f"Task done: {task.done()}")

await task

asyncio.run(main())

asyncio.gather vs asyncio.wait

Using gather for results:

import asyncio

async def fetch(n: int) -> int: await asyncio.sleep(1) return n * 2

async def main() -> None: # gather returns results in order results = await asyncio.gather( fetch(1), fetch(2), fetch(3) ) print(results) # [2, 4, 6]

# Return exceptions instead of raising
results = await asyncio.gather(
    fetch(1),
    fetch(2),
    return_exceptions=True
)

asyncio.run(main())

Using wait for more control:

import asyncio

async def worker(n: int) -> int: await asyncio.sleep(n) return n

async def main() -> None: tasks = [ asyncio.create_task(worker(1)), asyncio.create_task(worker(2)), asyncio.create_task(worker(3)) ]

# Wait for first task to complete
done, pending = await asyncio.wait(
    tasks,
    return_when=asyncio.FIRST_COMPLETED
)

print(f"Done: {len(done)}, Pending: {len(pending)}")

# Cancel pending tasks
for task in pending:
    task.cancel()

# Wait for all with timeout
done, pending = await asyncio.wait(
    tasks,
    timeout=2.0
)

asyncio.run(main())

Async Context Managers

Creating async context managers:

import asyncio from typing import AsyncIterator

class AsyncResource: async def aenter(self) -> "AsyncResource": print("Acquiring resource") await asyncio.sleep(0.1) return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    print("Releasing resource")
    await asyncio.sleep(0.1)

async def query(self) -> str:
    return "data"

async def main() -> None: async with AsyncResource() as resource: result = await resource.query() print(result)

asyncio.run(main())

Using asynccontextmanager decorator:

from contextlib import asynccontextmanager import asyncio

@asynccontextmanager async def get_connection(url: str) -> AsyncIterator[str]: # Setup print(f"Connecting to {url}") await asyncio.sleep(0.1) conn = f"connection-{url}"

try:
    yield conn
finally:
    # Teardown
    print(f"Closing connection to {url}")
    await asyncio.sleep(0.1)

async def main() -> None: async with get_connection("localhost") as conn: print(f"Using {conn}")

asyncio.run(main())

Async Iterators

Creating async iterators:

import asyncio from typing import AsyncIterator

class AsyncRange: def init(self, count: int) -> None: self.count = count

def __aiter__(self) -> AsyncIterator[int]:
    return self

async def __anext__(self) -> int:
    if self.count <= 0:
        raise StopAsyncIteration
    await asyncio.sleep(0.1)
    self.count -= 1
    return self.count

async def main() -> None: async for i in AsyncRange(5): print(i)

asyncio.run(main())

Async generator functions:

import asyncio from typing import AsyncIterator

async def async_range(count: int) -> AsyncIterator[int]: for i in range(count): await asyncio.sleep(0.1) yield i

async def fetch_pages(urls: list[str]) -> AsyncIterator[str]: for url in urls: await asyncio.sleep(0.5) yield f"Page content from {url}"

async def main() -> None: async for num in async_range(5): print(num)

urls = ["url1", "url2", "url3"]
async for page in fetch_pages(urls):
    print(page)

asyncio.run(main())

Async Queues

Producer-consumer pattern with Queue:

import asyncio from asyncio import Queue

async def producer(queue: Queue[int], n: int) -> None: for i in range(n): await asyncio.sleep(0.1) await queue.put(i) print(f"Produced {i}") await queue.put(None) # Sentinel value

async def consumer(queue: Queue[int], name: str) -> None: while True: item = await queue.get() if item is None: queue.task_done() break await asyncio.sleep(0.2) print(f"Consumer {name} processed {item}") queue.task_done()

async def main() -> None: queue: Queue[int] = Queue(maxsize=5)

# Start producer and consumers
prod = asyncio.create_task(producer(queue, 10))
cons1 = asyncio.create_task(consumer(queue, "A"))
cons2 = asyncio.create_task(consumer(queue, "B"))

await prod
await queue.join()  # Wait for all tasks to be processed

# Signal consumers to exit
await queue.put(None)
await queue.put(None)

await cons1
await cons2

asyncio.run(main())

Semaphore and Lock

Limiting concurrent operations:

import asyncio

async def fetch_with_limit( url: str, semaphore: asyncio.Semaphore ) -> str: async with semaphore: print(f"Fetching {url}") await asyncio.sleep(1) return f"Data from {url}"

async def main() -> None: # Limit to 3 concurrent operations semaphore = asyncio.Semaphore(3)

urls = [f"https://example.com/{i}" for i in range(10)]

tasks = [
    fetch_with_limit(url, semaphore)
    for url in urls
]

results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs")

asyncio.run(main())

Using Lock for mutual exclusion:

import asyncio

class Counter: def init(self) -> None: self.value = 0 self.lock = asyncio.Lock()

async def increment(self) -> None:
    async with self.lock:
        # Critical section
        current = self.value
        await asyncio.sleep(0.01)
        self.value = current + 1

async def main() -> None: counter = Counter()

# Run increments concurrently
await asyncio.gather(*[
    counter.increment()
    for _ in range(100)
])

print(f"Final value: {counter.value}")  # Should be 100

asyncio.run(main())

Timeouts and Cancellation

Using timeout:

import asyncio

async def slow_operation() -> str: await asyncio.sleep(5) return "completed"

async def main() -> None: # Timeout after 2 seconds try: result = await asyncio.wait_for( slow_operation(), timeout=2.0 ) print(result) except asyncio.TimeoutError: print("Operation timed out")

asyncio.run(main())

Handling cancellation:

import asyncio

async def cancellable_task() -> None: try: while True: print("Working...") await asyncio.sleep(1) except asyncio.CancelledError: print("Task was cancelled") # Cleanup raise

async def main() -> None: task = asyncio.create_task(cancellable_task())

await asyncio.sleep(3)
task.cancel()

try:
    await task
except asyncio.CancelledError:
    print("Task cancellation confirmed")

asyncio.run(main())

Event Loop Management

Direct event loop control:

import asyncio

async def task1() -> None: print("Task 1")

async def task2() -> None: print("Task 2")

Create new event loop

loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)

try: # Schedule callbacks loop.call_soon(lambda: print("Callback"))

# Schedule delayed callback
loop.call_later(1.0, lambda: print("Delayed"))

# Run coroutine
loop.run_until_complete(task1())

# Run multiple tasks
loop.run_until_complete(
    asyncio.gather(task1(), task2())
)

finally: loop.close()

concurrent.futures

ThreadPoolExecutor for I/O-bound tasks:

import asyncio from concurrent.futures import ThreadPoolExecutor import time

def blocking_io_task(n: int) -> int: print(f"Task {n} starting") time.sleep(2) # Blocking I/O return n * 2

async def main() -> None: loop = asyncio.get_event_loop()

with ThreadPoolExecutor(max_workers=3) as executor:
    # Run blocking function in thread pool
    tasks = [
        loop.run_in_executor(executor, blocking_io_task, i)
        for i in range(5)
    ]

    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

ProcessPoolExecutor for CPU-bound tasks:

import asyncio from concurrent.futures import ProcessPoolExecutor

def cpu_intensive_task(n: int) -> int: # CPU-intensive computation result = sum(i * i for i in range(n)) return result

async def main() -> None: loop = asyncio.get_event_loop()

with ProcessPoolExecutor(max_workers=4) as executor:
    # Run CPU-bound function in process pool
    tasks = [
        loop.run_in_executor(
            executor,
            cpu_intensive_task,
            10_000_000
        )
        for _ in range(4)
    ]

    results = await asyncio.gather(*tasks)
    print(f"Completed {len(results)} tasks")

asyncio.run(main())

Async HTTP with aiohttp

Making async HTTP requests:

import asyncio import aiohttp

async def fetch_url( session: aiohttp.ClientSession, url: str ) -> str: async with session.get(url) as response: return await response.text()

async def main() -> None: async with aiohttp.ClientSession() as session: urls = [ "https://example.com/1", "https://example.com/2", "https://example.com/3" ]

    tasks = [fetch_url(session, url) for url in urls]
    results = await asyncio.gather(*tasks)

    print(f"Fetched {len(results)} pages")

asyncio.run(main())

Error Handling

Handling exceptions in async code:

import asyncio

async def failing_task() -> None: await asyncio.sleep(1) raise ValueError("Task failed")

async def main() -> None: # Handle exception in single task try: await failing_task() except ValueError as e: print(f"Caught: {e}")

# Handle exceptions with gather
results = await asyncio.gather(
    failing_task(),
    failing_task(),
    return_exceptions=True
)

for result in results:
    if isinstance(result, Exception):
        print(f"Task failed: {result}")

asyncio.run(main())

When to Use This Skill

Use python-async-patterns when you need to:

  • Handle multiple I/O operations concurrently (API calls, database queries)

  • Build async web servers or clients

  • Process data streams asynchronously

  • Implement producer-consumer patterns with async queues

  • Run blocking I/O operations without blocking the event loop

  • Create async context managers for resource management

  • Implement async iterators for streaming data

  • Control concurrency with semaphores and locks

  • Handle timeouts and cancellation in async operations

  • Mix CPU-bound and I/O-bound operations efficiently

Best Practices

  • Use asyncio.run() for the main entry point

  • Create tasks with asyncio.create_task() for concurrent execution

  • Use gather() when you need all results

  • Use wait() when you need fine-grained control

  • Always handle CancelledError in long-running tasks

  • Use semaphores to limit concurrent operations

  • Prefer async context managers for resource management

  • Use asyncio.Queue for producer-consumer patterns

  • Run blocking I/O in thread pool with run_in_executor()

  • Run CPU-bound tasks in process pool

  • Set appropriate timeouts for network operations

  • Use structured concurrency patterns (nurseries)

Common Pitfalls

  • Forgetting to await coroutines (creates coroutine object, doesn't run)

  • Blocking the event loop with CPU-intensive work

  • Not handling task cancellation properly

  • Using time.sleep() instead of asyncio.sleep()

  • Creating too many concurrent tasks without limits

  • Not closing resources properly in async context

  • Mixing blocking and async code incorrectly

  • Not handling exceptions in background tasks

  • Forgetting to call task_done() with Queue

  • Using global event loop instead of asyncio.run()

Resources

  • asyncio Documentation

  • aiohttp Documentation

  • PEP 492 - Coroutines

  • PEP 525 - Async Generators

  • concurrent.futures

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

typescript-type-system

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

typescript-async-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

cpp-templates-metaprogramming

No summary provided by upstream source.

Repository SourceNeeds Review