tokio-networking

Tokio Networking Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "tokio-networking" with this command: npx skills add geoffjay/claude-plugins/geoffjay-claude-plugins-tokio-networking

Tokio Networking Patterns

This skill provides network programming patterns for building production-grade services with the Tokio ecosystem.

HTTP Service with Hyper and Axum

Build HTTP services with routing and middleware:

use axum::{ Router, routing::{get, post}, extract::{State, Path, Json}, response::IntoResponse, middleware, }; use std::sync::Arc;

#[derive(Clone)] struct AppState { db: Arc<Database>, cache: Arc<Cache>, }

async fn create_app() -> Router { let state = AppState { db: Arc::new(Database::new().await), cache: Arc::new(Cache::new()), };

Router::new()
    .route("/health", get(health_check))
    .route("/api/v1/users", get(list_users).post(create_user))
    .route("/api/v1/users/:id", get(get_user).delete(delete_user))
    .layer(middleware::from_fn(logging_middleware))
    .layer(middleware::from_fn(auth_middleware))
    .with_state(state)

}

async fn health_check() -> impl IntoResponse { "OK" }

async fn get_user( State(state): State<AppState>, Path(id): Path<u64>, ) -> Result<Json<User>, StatusCode> { state.db.get_user(id) .await .map(Json) .ok_or(StatusCode::NOT_FOUND) }

async fn logging_middleware<B>( req: Request<B>, next: Next<B>, ) -> impl IntoResponse { let method = req.method().clone(); let uri = req.uri().clone();

let start = Instant::now();
let response = next.run(req).await;
let duration = start.elapsed();

tracing::info!(
    method = %method,
    uri = %uri,
    status = %response.status(),
    duration_ms = duration.as_millis(),
    "request completed"
);

response

}

gRPC Service with Tonic

Build type-safe gRPC services:

use tonic::{transport::Server, Request, Response, Status};

pub mod proto { tonic::include_proto!("myservice"); }

use proto::my_service_server::{MyService, MyServiceServer};

#[derive(Default)] pub struct MyServiceImpl { db: Arc<Database>, }

#[tonic::async_trait] impl MyService for MyServiceImpl { async fn get_user( &self, request: Request<proto::GetUserRequest>, ) -> Result<Response<proto::User>, Status> { let req = request.into_inner();

    let user = self.db.get_user(req.id)
        .await
        .map_err(|e| Status::internal(e.to_string()))?
        .ok_or_else(|| Status::not_found("User not found"))?;

    Ok(Response::new(proto::User {
        id: user.id,
        name: user.name,
        email: user.email,
    }))
}

type ListUsersStream = ReceiverStream&#x3C;Result&#x3C;proto::User, Status>>;

async fn list_users(
    &#x26;self,
    request: Request&#x3C;proto::ListUsersRequest>,
) -> Result&#x3C;Response&#x3C;Self::ListUsersStream>, Status> {
    let (tx, rx) = mpsc::channel(100);

    let db = self.db.clone();
    tokio::spawn(async move {
        let mut users = db.list_users().await.unwrap();

        while let Some(user) = users.next().await {
            let proto_user = proto::User {
                id: user.id,
                name: user.name,
                email: user.email,
            };

            if tx.send(Ok(proto_user)).await.is_err() {
                break;
            }
        }
    });

    Ok(Response::new(ReceiverStream::new(rx)))
}

}

async fn serve() -> Result<(), Box<dyn std::error::Error>> { let addr = "[::1]:50051".parse()?; let service = MyServiceImpl::default();

Server::builder()
    .add_service(MyServiceServer::new(service))
    .serve(addr)
    .await?;

Ok(())

}

Tower Middleware Composition

Layer middleware for cross-cutting concerns:

use tower::{ServiceBuilder, Service}; use tower_http::{ trace::TraceLayer, compression::CompressionLayer, timeout::TimeoutLayer, limit::RateLimitLayer, }; use std::time::Duration;

fn create_middleware_stack<S>(service: S) -> impl Service where S: Service + Clone, { ServiceBuilder::new() // Outermost layer (executed first) .layer(TraceLayer::new_for_http()) .layer(CompressionLayer::new()) .layer(TimeoutLayer::new(Duration::from_secs(30))) .layer(RateLimitLayer::new(100, Duration::from_secs(1))) // Innermost layer (executed last) .service(service) }

// Custom middleware use tower::Layer;

#[derive(Clone)] struct MetricsLayer { metrics: Arc<Metrics>, }

impl<S> Layer<S> for MetricsLayer { type Service = MetricsService<S>;

fn layer(&#x26;self, inner: S) -> Self::Service {
    MetricsService {
        inner,
        metrics: self.metrics.clone(),
    }
}

}

#[derive(Clone)] struct MetricsService<S> { inner: S, metrics: Arc<Metrics>, }

impl<S, Req> Service<Req> for MetricsService<S> where S: Service<Req>, { type Response = S::Response; type Error = S::Error; type Future = /* ... */;

fn poll_ready(&#x26;mut self, cx: &#x26;mut Context&#x3C;'_>) -> Poll&#x3C;Result&#x3C;(), Self::Error>> {
    self.inner.poll_ready(cx)
}

fn call(&#x26;mut self, req: Req) -> Self::Future {
    self.metrics.requests_total.inc();
    let timer = self.metrics.request_duration.start_timer();

    let future = self.inner.call(req);
    let metrics = self.metrics.clone();

    Box::pin(async move {
        let result = future.await;
        timer.observe_duration();
        result
    })
}

}

Connection Pooling

Manage connection pools efficiently:

use deadpool_postgres::{Config, Pool, Runtime}; use tokio_postgres::NoTls;

pub struct DatabasePool { pool: Pool, }

impl DatabasePool { pub async fn new(config: &DatabaseConfig) -> Result<Self, Error> { let mut cfg = Config::new(); cfg.host = Some(config.host.clone()); cfg.port = Some(config.port); cfg.dbname = Some(config.database.clone()); cfg.user = Some(config.user.clone()); cfg.password = Some(config.password.clone());

    let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls)?;

    Ok(Self { pool })
}

pub async fn get(&#x26;self) -> Result&#x3C;Client, Error> {
    self.pool.get().await.map_err(Into::into)
}

pub async fn query&#x3C;T>(&#x26;self, f: impl FnOnce(&#x26;Client) -> F) -> Result&#x3C;T, Error>
where
    F: Future&#x3C;Output = Result&#x3C;T, Error>>,
{
    let client = self.get().await?;
    f(&#x26;client).await
}

}

// Usage let pool = DatabasePool::new(&config).await?;

let users = pool.query(|client| async move { client.query("SELECT * FROM users", &[]) .await .map_err(Into::into) }).await?;

Health Checks and Readiness Probes

Implement comprehensive health checks:

use axum::{Router, routing::get, Json}; use serde::Serialize;

#[derive(Serialize)] struct HealthResponse { status: String, version: String, dependencies: Vec<DependencyHealth>, }

#[derive(Serialize)] struct DependencyHealth { name: String, status: String, latency_ms: Option<u64>, message: Option<String>, }

async fn health_check(State(state): State<Arc<AppState>>) -> Json<HealthResponse> { let mut dependencies = Vec::new();

// Check database
let db_start = Instant::now();
let db_status = match state.db.ping().await {
    Ok(_) => DependencyHealth {
        name: "database".into(),
        status: "healthy".into(),
        latency_ms: Some(db_start.elapsed().as_millis() as u64),
        message: None,
    },
    Err(e) => DependencyHealth {
        name: "database".into(),
        status: "unhealthy".into(),
        latency_ms: None,
        message: Some(e.to_string()),
    },
};
dependencies.push(db_status);

// Check cache
let cache_start = Instant::now();
let cache_status = match state.cache.ping().await {
    Ok(_) => DependencyHealth {
        name: "cache".into(),
        status: "healthy".into(),
        latency_ms: Some(cache_start.elapsed().as_millis() as u64),
        message: None,
    },
    Err(e) => DependencyHealth {
        name: "cache".into(),
        status: "unhealthy".into(),
        latency_ms: None,
        message: Some(e.to_string()),
    },
};
dependencies.push(cache_status);

let all_healthy = dependencies.iter().all(|d| d.status == "healthy");

Json(HealthResponse {
    status: if all_healthy { "healthy" } else { "unhealthy" }.into(),
    version: env!("CARGO_PKG_VERSION").into(),
    dependencies,
})

}

async fn readiness_check(State(state): State<Arc<AppState>>) -> StatusCode { if state.is_ready().await { StatusCode::OK } else { StatusCode::SERVICE_UNAVAILABLE } }

pub fn health_routes() -> Router<Arc<AppState>> { Router::new() .route("/health", get(health_check)) .route("/ready", get(readiness_check)) .route("/live", get(|| async { StatusCode::OK })) }

Circuit Breaker Pattern

Protect against cascading failures:

use std::sync::atomic::{AtomicU64, Ordering};

pub struct ServiceClient { client: reqwest::Client, circuit_breaker: CircuitBreaker, }

impl ServiceClient { pub async fn call(&self, req: Request) -> Result<Response, Error> { self.circuit_breaker.call(async { self.client .execute(req) .await .map_err(Into::into) }).await } }

Load Balancing

Distribute requests across multiple backends:

use tower::balance::p2c::Balance; use tower::discover::ServiceList;

pub struct LoadBalancer { balancer: Balance<ServiceList<Vec<ServiceEndpoint>>, Request>, }

impl LoadBalancer { pub fn new(endpoints: Vec<String>) -> Self { let services: Vec<_> = endpoints .into_iter() .map(|endpoint| create_client(endpoint)) .collect();

    let balancer = Balance::new(ServiceList::new(services));

    Self { balancer }
}

pub async fn call(&#x26;mut self, req: Request) -> Result&#x3C;Response, Error> {
    self.balancer.call(req).await
}

}

Request Deduplication

Deduplicate concurrent identical requests:

use tokio::sync::Mutex; use std::collections::HashMap;

pub struct RequestDeduplicator<K, V> { in_flight: Arc<Mutex<HashMap<K, Arc<tokio::sync::Notify>>>>, cache: Arc<Mutex<HashMap<K, Arc<V>>>>, }

impl<K: Eq + Hash + Clone, V> RequestDeduplicator<K, V> { pub fn new() -> Self { Self { in_flight: Arc::new(Mutex::new(HashMap::new())), cache: Arc::new(Mutex::new(HashMap::new())), } }

pub async fn get_or_fetch&#x3C;F, Fut>(
    &#x26;self,
    key: K,
    fetch: F,
) -> Result&#x3C;Arc&#x3C;V>, Error>
where
    F: FnOnce() -> Fut,
    Fut: Future&#x3C;Output = Result&#x3C;V, Error>>,
{
    // Check cache
    {
        let cache = self.cache.lock().await;
        if let Some(value) = cache.get(&#x26;key) {
            return Ok(value.clone());
        }
    }

    // Check if request is in flight
    let notify = {
        let mut in_flight = self.in_flight.lock().await;
        if let Some(notify) = in_flight.get(&#x26;key) {
            notify.clone()
        } else {
            let notify = Arc::new(tokio::sync::Notify::new());
            in_flight.insert(key.clone(), notify.clone());
            notify
        }
    };

    // Wait if another request is in progress
    notify.notified().await;

    // Check cache again
    {
        let cache = self.cache.lock().await;
        if let Some(value) = cache.get(&#x26;key) {
            return Ok(value.clone());
        }
    }

    // Fetch value
    let value = Arc::new(fetch().await?);

    // Update cache
    {
        let mut cache = self.cache.lock().await;
        cache.insert(key.clone(), value.clone());
    }

    // Remove from in-flight and notify
    {
        let mut in_flight = self.in_flight.lock().await;
        in_flight.remove(&#x26;key);
    }
    notify.notify_waiters();

    Ok(value)
}

}

Best Practices

  • Use connection pooling for database and HTTP connections

  • Implement health checks for all dependencies

  • Add circuit breakers for external service calls

  • Use appropriate timeouts for all network operations

  • Implement retry logic with exponential backoff

  • Add comprehensive middleware for logging, metrics, auth

  • Use load balancing for high availability

  • Deduplicate requests to reduce load

  • Monitor latency and error rates

  • Design for graceful degradation when services fail

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

documentation-update

No summary provided by upstream source.

Repository SourceNeeds Review
General

git-troubleshooting

No summary provided by upstream source.

Repository SourceNeeds Review
General

git-advanced

No summary provided by upstream source.

Repository SourceNeeds Review