tokio-patterns

This skill provides common patterns and idioms for building robust async applications with Tokio.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "tokio-patterns" with this command: npx skills add geoffjay/claude-plugins/geoffjay-claude-plugins-tokio-patterns

Tokio Patterns

This skill provides common patterns and idioms for building robust async applications with Tokio.

Worker Pool Pattern

Limit concurrent task execution using a semaphore:

use tokio::sync::Semaphore; use std::sync::Arc;

pub struct WorkerPool { semaphore: Arc<Semaphore>, }

impl WorkerPool { pub fn new(size: usize) -> Self { Self { semaphore: Arc::new(Semaphore::new(size)), } }

pub async fn execute&#x3C;F, T>(&#x26;self, f: F) -> T
where
    F: Future&#x3C;Output = T>,
{
    let _permit = self.semaphore.acquire().await.unwrap();
    f.await
}

}

// Usage let pool = WorkerPool::new(10); let results = futures::future::join_all( (0..100).map(|i| pool.execute(process_item(i))) ).await;

Request-Response Pattern

Use oneshot channels for request-response communication:

use tokio::sync::{mpsc, oneshot};

pub enum Command { Get { key: String, respond_to: oneshot::Sender<Option<String>> }, Set { key: String, value: String }, }

pub async fn actor(mut rx: mpsc::Receiver<Command>) { let mut store = HashMap::new();

while let Some(cmd) = rx.recv().await {
    match cmd {
        Command::Get { key, respond_to } => {
            let value = store.get(&#x26;key).cloned();
            let _ = respond_to.send(value);
        }
        Command::Set { key, value } => {
            store.insert(key, value);
        }
    }
}

}

// Client usage let (tx, rx) = mpsc::channel(32); tokio::spawn(actor(rx));

let (respond_to, response) = oneshot::channel(); tx.send(Command::Get { key: "foo".into(), respond_to }).await.unwrap(); let value = response.await.unwrap();

Pub/Sub with Channels

Use broadcast channels for pub/sub messaging:

use tokio::sync::broadcast;

pub struct PubSub<T: Clone> { tx: broadcast::Sender<T>, }

impl<T: Clone> PubSub<T> { pub fn new(capacity: usize) -> Self { let (tx, _) = broadcast::channel(capacity); Self { tx } }

pub fn subscribe(&#x26;self) -> broadcast::Receiver&#x3C;T> {
    self.tx.subscribe()
}

pub fn publish(&#x26;self, message: T) -> Result&#x3C;usize, broadcast::error::SendError&#x3C;T>> {
    self.tx.send(message)
}

}

// Usage let pubsub = PubSub::new(100);

// Subscriber 1 let mut rx1 = pubsub.subscribe(); tokio::spawn(async move { while let Ok(msg) = rx1.recv().await { println!("Subscriber 1: {:?}", msg); } });

// Subscriber 2 let mut rx2 = pubsub.subscribe(); tokio::spawn(async move { while let Ok(msg) = rx2.recv().await { println!("Subscriber 2: {:?}", msg); } });

// Publisher pubsub.publish("Hello".to_string()).unwrap();

Timeout Pattern

Wrap operations with timeouts:

use tokio::time::{timeout, Duration};

pub async fn with_timeout<F, T>(duration: Duration, future: F) -> Result<T, TimeoutError> where F: Future<Output = Result<T, Error>>, { match timeout(duration, future).await { Ok(Ok(result)) => Ok(result), Ok(Err(e)) => Err(TimeoutError::Inner(e)), Err(_) => Err(TimeoutError::Elapsed), } }

// Usage let result = with_timeout( Duration::from_secs(5), fetch_data() ).await?;

Retry with Exponential Backoff

Retry failed operations with backoff:

use tokio::time::{sleep, Duration};

pub async fn retry_with_backoff<F, T, E>( mut operation: F, max_retries: u32, initial_backoff: Duration, ) -> Result<T, E> where F: FnMut() -> Pin<Box<dyn Future<Output = Result<T, E>>>>, { let mut retries = 0; let mut backoff = initial_backoff;

loop {
    match operation().await {
        Ok(result) => return Ok(result),
        Err(e) if retries &#x3C; max_retries => {
            retries += 1;
            sleep(backoff).await;
            backoff *= 2; // Exponential backoff
        }
        Err(e) => return Err(e),
    }
}

}

// Usage let result = retry_with_backoff( || Box::pin(fetch_data()), 3, Duration::from_millis(100) ).await?;

Graceful Shutdown

Coordinate graceful shutdown across components:

use tokio::sync::broadcast; use tokio::select;

pub struct ShutdownCoordinator { tx: broadcast::Sender<()>, }

impl ShutdownCoordinator { pub fn new() -> Self { let (tx, _) = broadcast::channel(1); Self { tx } }

pub fn subscribe(&#x26;self) -> broadcast::Receiver&#x3C;()> {
    self.tx.subscribe()
}

pub fn shutdown(&#x26;self) {
    let _ = self.tx.send(());
}

}

// Worker pattern pub async fn worker(mut shutdown: broadcast::Receiver<()>) { loop { select! { _ = shutdown.recv() => { // Cleanup break; } result = do_work() => { // Process result } } } }

// Main let coordinator = ShutdownCoordinator::new();

let shutdown_rx1 = coordinator.subscribe(); let h1 = tokio::spawn(worker(shutdown_rx1));

let shutdown_rx2 = coordinator.subscribe(); let h2 = tokio::spawn(worker(shutdown_rx2));

// Wait for signal tokio::signal::ctrl_c().await.unwrap(); coordinator.shutdown();

// Wait for workers let _ = tokio::join!(h1, h2);

Async Initialization

Lazy async initialization with OnceCell :

use tokio::sync::OnceCell;

pub struct Service { connection: OnceCell<Connection>, }

impl Service { pub fn new() -> Self { Self { connection: OnceCell::new(), } }

async fn get_connection(&#x26;self) -> &#x26;Connection {
    self.connection
        .get_or_init(|| async {
            Connection::connect().await.unwrap()
        })
        .await
}

pub async fn query(&#x26;self, sql: &#x26;str) -> Result&#x3C;Vec&#x3C;Row>> {
    let conn = self.get_connection().await;
    conn.query(sql).await
}

}

Resource Cleanup with Drop

Ensure cleanup even on task cancellation:

pub struct Resource { handle: SomeHandle, }

impl Resource { pub async fn new() -> Self { Self { handle: acquire_resource().await, } }

pub async fn use_resource(&#x26;self) -> Result&#x3C;()> {
    // Use the resource
    Ok(())
}

}

impl Drop for Resource { fn drop(&mut self) { // Synchronous cleanup // For async cleanup, use a separate shutdown method self.handle.close(); } }

// For async cleanup impl Resource { pub async fn shutdown(self) { // Async cleanup self.handle.close_async().await; } }

Select Multiple Futures

Use select! to race multiple operations:

use tokio::select;

pub async fn select_example() { let mut rx1 = channel1(); let mut rx2 = channel2();

loop {
    select! {
        msg = rx1.recv() => {
            if let Some(msg) = msg {
                handle_channel1(msg).await;
            } else {
                break;
            }
        }
        msg = rx2.recv() => {
            if let Some(msg) = msg {
                handle_channel2(msg).await;
            } else {
                break;
            }
        }
        _ = tokio::time::sleep(Duration::from_secs(60)) => {
            check_timeout().await;
        }
    }
}

}

Cancellation Token Pattern

Use tokio_util::sync::CancellationToken for cooperative cancellation:

use tokio_util::sync::CancellationToken;

pub async fn worker(token: CancellationToken) { loop { tokio::select! { _ = token.cancelled() => { // Cleanup break; } _ = do_work() => { // Continue } } } }

// Hierarchical cancellation let parent_token = CancellationToken::new(); let child_token = parent_token.child_token();

tokio::spawn(worker(child_token));

// Cancel all parent_token.cancel();

Best Practices

  • Use semaphores for limiting concurrent operations

  • Implement graceful shutdown in all long-running tasks

  • Add timeouts to external operations

  • Use channels for inter-task communication

  • Handle cancellation properly in all tasks

  • Clean up resources in Drop or explicit shutdown methods

  • Use appropriate channel types for different patterns

  • Implement retries for transient failures

  • Use select! for coordinating multiple async operations

  • Document lifetime and ownership patterns clearly

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

documentation-update

No summary provided by upstream source.

Repository SourceNeeds Review
General

git-troubleshooting

No summary provided by upstream source.

Repository SourceNeeds Review
General

git-advanced

No summary provided by upstream source.

Repository SourceNeeds Review
General

tokio-concurrency

No summary provided by upstream source.

Repository SourceNeeds Review
tokio-patterns | V50.AI