tokio-patterns

安装量: 44
排名: #16712

安装

npx skills add https://github.com/geoffjay/claude-plugins --skill tokio-patterns

This skill provides common patterns and idioms for building robust async applications with Tokio.

Worker Pool Pattern

Limit concurrent task execution using a semaphore:

use tokio::sync::Semaphore;
use std::sync::Arc;

pub struct WorkerPool {
    semaphore: Arc<Semaphore>,
}

impl WorkerPool {
    pub fn new(size: usize) -> Self {
        Self {
            semaphore: Arc::new(Semaphore::new(size)),
        }
    }

    pub async fn execute<F, T>(&self, f: F) -> T
    where
        F: Future<Output = T>,
    {
        let _permit = self.semaphore.acquire().await.unwrap();
        f.await
    }
}

// Usage
let pool = WorkerPool::new(10);
let results = futures::future::join_all(
    (0..100).map(|i| pool.execute(process_item(i)))
).await;

Request-Response Pattern

Use oneshot channels for request-response communication:

use tokio::sync::{mpsc, oneshot};

pub enum Command {
    Get { key: String, respond_to: oneshot::Sender<Option<String>> },
    Set { key: String, value: String },
}

pub async fn actor(mut rx: mpsc::Receiver<Command>) {
    let mut store = HashMap::new();

    while let Some(cmd) = rx.recv().await {
        match cmd {
            Command::Get { key, respond_to } => {
                let value = store.get(&key).cloned();
                let _ = respond_to.send(value);
            }
            Command::Set { key, value } => {
                store.insert(key, value);
            }
        }
    }
}

// Client usage
let (tx, rx) = mpsc::channel(32);
tokio::spawn(actor(rx));

let (respond_to, response) = oneshot::channel();
tx.send(Command::Get { key: "foo".into(), respond_to }).await.unwrap();
let value = response.await.unwrap();

Pub/Sub with Channels

Use broadcast channels for pub/sub messaging:

use tokio::sync::broadcast;

pub struct PubSub<T: Clone> {
    tx: broadcast::Sender<T>,
}

impl<T: Clone> PubSub<T> {
    pub fn new(capacity: usize) -> Self {
        let (tx, _) = broadcast::channel(capacity);
        Self { tx }
    }

    pub fn subscribe(&self) -> broadcast::Receiver<T> {
        self.tx.subscribe()
    }

    pub fn publish(&self, message: T) -> Result<usize, broadcast::error::SendError<T>> {
        self.tx.send(message)
    }
}

// Usage
let pubsub = PubSub::new(100);

// Subscriber 1
let mut rx1 = pubsub.subscribe();
tokio::spawn(async move {
    while let Ok(msg) = rx1.recv().await {
        println!("Subscriber 1: {:?}", msg);
    }
});

// Subscriber 2
let mut rx2 = pubsub.subscribe();
tokio::spawn(async move {
    while let Ok(msg) = rx2.recv().await {
        println!("Subscriber 2: {:?}", msg);
    }
});

// Publisher
pubsub.publish("Hello".to_string()).unwrap();

Timeout Pattern

Wrap operations with timeouts:

use tokio::time::{timeout, Duration};

pub async fn with_timeout<F, T>(duration: Duration, future: F) -> Result<T, TimeoutError>
where
    F: Future<Output = Result<T, Error>>,
{
    match timeout(duration, future).await {
        Ok(Ok(result)) => Ok(result),
        Ok(Err(e)) => Err(TimeoutError::Inner(e)),
        Err(_) => Err(TimeoutError::Elapsed),
    }
}

// Usage
let result = with_timeout(
    Duration::from_secs(5),
    fetch_data()
).await?;

Retry with Exponential Backoff

Retry failed operations with backoff:

use tokio::time::{sleep, Duration};

pub async fn retry_with_backoff<F, T, E>(
    mut operation: F,
    max_retries: u32,
    initial_backoff: Duration,
) -> Result<T, E>
where
    F: FnMut() -> Pin<Box<dyn Future<Output = Result<T, E>>>>,
{
    let mut retries = 0;
    let mut backoff = initial_backoff;

    loop {
        match operation().await {
            Ok(result) => return Ok(result),
            Err(e) if retries < max_retries => {
                retries += 1;
                sleep(backoff).await;
                backoff *= 2; // Exponential backoff
            }
            Err(e) => return Err(e),
        }
    }
}

// Usage
let result = retry_with_backoff(
    || Box::pin(fetch_data()),
    3,
    Duration::from_millis(100)
).await?;

Graceful Shutdown

Coordinate graceful shutdown across components:

use tokio::sync::broadcast;
use tokio::select;

pub struct ShutdownCoordinator {
    tx: broadcast::Sender<()>,
}

impl ShutdownCoordinator {
    pub fn new() -> Self {
        let (tx, _) = broadcast::channel(1);
        Self { tx }
    }

    pub fn subscribe(&self) -> broadcast::Receiver<()> {
        self.tx.subscribe()
    }

    pub fn shutdown(&self) {
        let _ = self.tx.send(());
    }
}

// Worker pattern
pub async fn worker(mut shutdown: broadcast::Receiver<()>) {
    loop {
        select! {
            _ = shutdown.recv() => {
                // Cleanup
                break;
            }
            result = do_work() => {
                // Process result
            }
        }
    }
}

// Main
let coordinator = ShutdownCoordinator::new();

let shutdown_rx1 = coordinator.subscribe();
let h1 = tokio::spawn(worker(shutdown_rx1));

let shutdown_rx2 = coordinator.subscribe();
let h2 = tokio::spawn(worker(shutdown_rx2));

// Wait for signal
tokio::signal::ctrl_c().await.unwrap();
coordinator.shutdown();

// Wait for workers
let _ = tokio::join!(h1, h2);

Async Initialization

Lazy async initialization with OnceCell:

use tokio::sync::OnceCell;

pub struct Service {
    connection: OnceCell<Connection>,
}

impl Service {
    pub fn new() -> Self {
        Self {
            connection: OnceCell::new(),
        }
    }

    async fn get_connection(&self) -> &Connection {
        self.connection
            .get_or_init(|| async {
                Connection::connect().await.unwrap()
            })
            .await
    }

    pub async fn query(&self, sql: &str) -> Result<Vec<Row>> {
        let conn = self.get_connection().await;
        conn.query(sql).await
    }
}

Resource Cleanup with Drop

Ensure cleanup even on task cancellation:

pub struct Resource {
    handle: SomeHandle,
}

impl Resource {
    pub async fn new() -> Self {
        Self {
            handle: acquire_resource().await,
        }
    }

    pub async fn use_resource(&self) -> Result<()> {
        // Use the resource
        Ok(())
    }
}

impl Drop for Resource {
    fn drop(&mut self) {
        // Synchronous cleanup
        // For async cleanup, use a separate shutdown method
        self.handle.close();
    }
}

// For async cleanup
impl Resource {
    pub async fn shutdown(self) {
        // Async cleanup
        self.handle.close_async().await;
    }
}

Select Multiple Futures

Use select! to race multiple operations:

use tokio::select;

pub async fn select_example() {
    let mut rx1 = channel1();
    let mut rx2 = channel2();

    loop {
        select! {
            msg = rx1.recv() => {
                if let Some(msg) = msg {
                    handle_channel1(msg).await;
                } else {
                    break;
                }
            }
            msg = rx2.recv() => {
                if let Some(msg) = msg {
                    handle_channel2(msg).await;
                } else {
                    break;
                }
            }
            _ = tokio::time::sleep(Duration::from_secs(60)) => {
                check_timeout().await;
            }
        }
    }
}

Cancellation Token Pattern

Use tokio_util::sync::CancellationToken for cooperative cancellation:

use tokio_util::sync::CancellationToken;

pub async fn worker(token: CancellationToken) {
    loop {
        tokio::select! {
            _ = token.cancelled() => {
                // Cleanup
                break;
            }
            _ = do_work() => {
                // Continue
            }
        }
    }
}

// Hierarchical cancellation
let parent_token = CancellationToken::new();
let child_token = parent_token.child_token();

tokio::spawn(worker(child_token));

// Cancel all
parent_token.cancel();

Best Practices

  • Use semaphores for limiting concurrent operations

  • Implement graceful shutdown in all long-running tasks

  • Add timeouts to external operations

  • Use channels for inter-task communication

  • Handle cancellation properly in all tasks

  • Clean up resources in Drop or explicit shutdown methods

  • Use appropriate channel types for different patterns

  • Implement retries for transient failures

  • Use select! for coordinating multiple async operations

  • Document lifetime and ownership patterns clearly

返回排行榜