concurrency-patterns

安装量: 138
排名: #6251

安装

npx skills add https://github.com/aj-geddes/useful-ai-prompts --skill concurrency-patterns

Concurrency Patterns Overview

Implement safe concurrent code using proper synchronization primitives and patterns for parallel execution.

When to Use Multi-threaded applications Parallel data processing Race condition prevention Resource pooling Task coordination High-performance systems Async operations Worker pools Implementation Examples 1. Promise Pool (TypeScript) class PromisePool { private queue: Array<() => Promise> = []; private active = 0;

constructor(private concurrency: number) {}

async add(fn: () => Promise): Promise { while (this.active >= this.concurrency) { await this.waitForSlot(); }

this.active++;

try {
  return await fn();
} finally {
  this.active--;
}

}

private async waitForSlot(): Promise { return new Promise(resolve => { const checkSlot = () => { if (this.active < this.concurrency) { resolve(); } else { setTimeout(checkSlot, 10); } }; checkSlot(); }); }

async map( items: T[], fn: (item: T) => Promise ): Promise { return Promise.all( items.map(item => this.add(() => fn(item))) ); } }

// Usage const pool = new PromisePool(5);

const urls = Array.from({ length: 100 }, (_, i) => https://api.example.com/item/${i} );

const results = await pool.map(urls, async (url) => { const response = await fetch(url); return response.json(); });

  1. Mutex and Semaphore (TypeScript) class Mutex { private locked = false; private queue: Array<() => void> = [];

async acquire(): Promise { if (!this.locked) { this.locked = true; return; }

return new Promise(resolve => {
  this.queue.push(resolve);
});

}

release(): void { if (this.queue.length > 0) { const resolve = this.queue.shift()!; resolve(); } else { this.locked = false; } }

async runExclusive(fn: () => Promise): Promise { await this.acquire(); try { return await fn(); } finally { this.release(); } } }

class Semaphore { private available: number; private queue: Array<() => void> = [];

constructor(private max: number) { this.available = max; }

async acquire(): Promise { if (this.available > 0) { this.available--; return; }

return new Promise(resolve => {
  this.queue.push(resolve);
});

}

release(): void { if (this.queue.length > 0) { const resolve = this.queue.shift()!; resolve(); } else { this.available++; } }

async runExclusive(fn: () => Promise): Promise { await this.acquire(); try { return await fn(); } finally { this.release(); } } }

// Usage const mutex = new Mutex(); let counter = 0;

async function incrementCounter() { await mutex.runExclusive(async () => { const current = counter; await new Promise(resolve => setTimeout(resolve, 10)); counter = current + 1; }); }

// Database connection pool with semaphore const dbSemaphore = new Semaphore(10); // Max 10 concurrent connections

async function queryDatabase(query: string) { return dbSemaphore.runExclusive(async () => { // Execute query return executeQuery(query); }); }

async function executeQuery(query: string) { // Query logic }

  1. Worker Pool (Node.js) import { Worker } from 'worker_threads';

interface Task { id: string; data: any; resolve: (value: T) => void; reject: (error: Error) => void; }

class WorkerPool { private workers: Worker[] = []; private availableWorkers: Worker[] = []; private taskQueue: Task[] = [];

constructor( private workerScript: string, private poolSize: number ) { this.initializeWorkers(); }

private initializeWorkers(): void { for (let i = 0; i < this.poolSize; i++) { const worker = new Worker(this.workerScript);

  worker.on('message', (result) => {
    this.handleWorkerMessage(worker, result);
  });

  worker.on('error', (error) => {
    console.error('Worker error:', error);
  });

  this.workers.push(worker);
  this.availableWorkers.push(worker);
}

}

async execute(data: any): Promise { return new Promise((resolve, reject) => { const task: Task = { id: Math.random().toString(36), data, resolve, reject };

  this.taskQueue.push(task);
  this.processQueue();
});

}

private processQueue(): void { while (this.taskQueue.length > 0 && this.availableWorkers.length > 0) { const task = this.taskQueue.shift()!; const worker = this.availableWorkers.shift()!;

  worker.postMessage({
    taskId: task.id,
    data: task.data
  });

  (worker as any).currentTask = task;
}

}

private handleWorkerMessage(worker: Worker, result: any): void { const task = (worker as any).currentTask as Task;

if (!task) return;

if (result.error) {
  task.reject(new Error(result.error));
} else {
  task.resolve(result.data);
}

delete (worker as any).currentTask;
this.availableWorkers.push(worker);
this.processQueue();

}

async terminate(): Promise { await Promise.all( this.workers.map(worker => worker.terminate()) ); } }

// worker.js // const { parentPort } = require('worker_threads'); // // parentPort.on('message', async ({ taskId, data }) => { // try { // const result = await processData(data); // parentPort.postMessage({ taskId, data: result }); // } catch (error) { // parentPort.postMessage({ taskId, error: error.message }); // } // });

  1. Python Threading Patterns import threading from queue import Queue from typing import Callable, List, TypeVar, Generic import time

T = TypeVar('T') R = TypeVar('R')

class ThreadPool(Generic[T, R]): def init(self, num_threads: int): self.num_threads = num_threads self.tasks: Queue = Queue() self.results: List[R] = [] self.lock = threading.Lock() self.workers: List[threading.Thread] = []

def map(self, func: Callable[[T], R], items: List[T]) -> List[R]:
    """Map function over items using thread pool."""
    # Add tasks to queue
    for item in items:
        self.tasks.put(item)

    # Start workers
    for _ in range(self.num_threads):
        worker = threading.Thread(
            target=self._worker,
            args=(func,)
        )
        worker.start()
        self.workers.append(worker)

    # Wait for completion
    self.tasks.join()

    # Stop workers
    for _ in range(self.num_threads):
        self.tasks.put(None)

    for worker in self.workers:
        worker.join()

    return self.results

def _worker(self, func: Callable[[T], R]):
    """Worker thread."""
    while True:
        item = self.tasks.get()

        if item is None:
            self.tasks.task_done()
            break

        try:
            result = func(item)

            with self.lock:
                self.results.append(result)
        finally:
            self.tasks.task_done()

class Mutex: def init(self): self._lock = threading.Lock()

def __enter__(self):
    self._lock.acquire()
    return self

def __exit__(self, *args):
    self._lock.release()

class Semaphore: def init(self, max_count: int): self._semaphore = threading.Semaphore(max_count)

def __enter__(self):
    self._semaphore.acquire()
    return self

def __exit__(self, *args):
    self._semaphore.release()

Usage

def process_item(item: int) -> int: time.sleep(0.1) return item * 2

pool = ThreadPool(num_threads=5) items = list(range(100)) results = pool.map(process_item, items) print(f"Processed {len(results)} items")

Mutex example

counter = 0 mutex = Mutex()

def increment(): global counter with mutex: current = counter time.sleep(0.001) counter = current + 1

threads = [threading.Thread(target=increment) for _ in range(100)] for t in threads: t.start() for t in threads: t.join()

print(f"Counter: {counter}") # Should be 100

Semaphore example

db_connections = Semaphore(max_count=10)

def query_database(query: str): with db_connections: # Execute query with limited connections time.sleep(0.1) print(f"Executing: {query}")

  1. Async Patterns (Python asyncio) import asyncio from typing import Callable, List, TypeVar, Awaitable

T = TypeVar('T') R = TypeVar('R')

class AsyncPool: def init(self, concurrency: int): self.semaphore = asyncio.Semaphore(concurrency)

async def map(
    self,
    func: Callable[[T], Awaitable[R]],
    items: List[T]
) -> List[R]:
    """Map async function over items with concurrency limit."""
    async def bounded_func(item: T) -> R:
        async with self.semaphore:
            return await func(item)

    return await asyncio.gather(*[
        bounded_func(item) for item in items
    ])

class AsyncQueue: def init(self, max_size: int = 0): self.queue = asyncio.Queue(maxsize=max_size)

async def put(self, item):
    await self.queue.put(item)

async def get(self):
    return await self.queue.get()

def task_done(self):
    self.queue.task_done()

async def join(self):
    await self.queue.join()

Producer-Consumer pattern

async def producer(queue: AsyncQueue, items: List[int]): """Produce items.""" for item in items: await queue.put(item) print(f"Produced: {item}") await asyncio.sleep(0.1)

async def consumer(queue: AsyncQueue, name: str): """Consume items.""" while True: item = await queue.get()

    if item is None:
        queue.task_done()
        break

    print(f"{name} consuming: {item}")
    await asyncio.sleep(0.2)
    queue.task_done()

async def main(): queue = AsyncQueue(max_size=10)

# Start consumers
consumers = [
    asyncio.create_task(consumer(queue, f"Consumer-{i}"))
    for i in range(3)
]

# Start producer
await producer(queue, list(range(20)))

# Wait for all items to be processed
await queue.join()

# Stop consumers
for _ in range(3):
    await queue.put(None)

await asyncio.gather(*consumers)

asyncio.run(main())

  1. Go-Style Channels (Simulation) class Channel { private buffer: T[] = []; private senders: Array<{ value: T; resolve: () => void }> = []; private receivers: Array<(value: T) => void> = []; private closed = false;

constructor(private bufferSize: number = 0) {}

async send(value: T): Promise { if (this.closed) { throw new Error('Channel is closed'); }

if (this.receivers.length > 0) {
  const receiver = this.receivers.shift()!;
  receiver(value);
  return;
}

if (this.buffer.length < this.bufferSize) {
  this.buffer.push(value);
  return;
}

return new Promise(resolve => {
  this.senders.push({ value, resolve });
});

}

async receive(): Promise { if (this.buffer.length > 0) { const value = this.buffer.shift()!;

  if (this.senders.length > 0) {
    const sender = this.senders.shift()!;
    this.buffer.push(sender.value);
    sender.resolve();
  }

  return value;
}

if (this.senders.length > 0) {
  const sender = this.senders.shift()!;
  sender.resolve();
  return sender.value;
}

if (this.closed) {
  return undefined;
}

return new Promise(resolve => {
  this.receivers.push(resolve);
});

}

close(): void { this.closed = true; this.receivers.forEach(receiver => receiver(undefined as any)); this.receivers = []; } }

// Usage async function example() { const channel = new Channel(5);

// Producer async function producer() { for (let i = 0; i < 10; i++) { await channel.send(i); console.log(Sent: ${i}); } channel.close(); }

// Consumer async function consumer() { while (true) { const value = await channel.receive(); if (value === undefined) break; console.log(Received: ${value}); } }

await Promise.all([ producer(), consumer() ]); }

Best Practices ✅ DO Use proper synchronization primitives Limit concurrency to avoid resource exhaustion Handle errors in concurrent operations Use immutable data when possible Test concurrent code thoroughly Profile concurrent performance Document thread-safety guarantees ❌ DON'T Share mutable state without synchronization Use sleep/polling for coordination Create unlimited threads/workers Ignore race conditions Block event loops in async code Forget to clean up resources Resources Node.js Worker Threads Python Threading Python asyncio

返回排行榜