Concurrency Patterns Overview
Implement safe concurrent code using proper synchronization primitives and patterns for parallel execution.
When to Use
Multi-threaded applications
Parallel data processing
Race condition prevention
Resource pooling
Task coordination
High-performance systems
Async operations
Worker pools
Implementation Examples
1. Promise Pool (TypeScript)
class PromisePool {
private queue: Array<() => Promise
constructor(private concurrency: number) {}
async add
this.active++;
try {
return await fn();
} finally {
this.active--;
}
}
private async waitForSlot(): Promise
async map
// Usage const pool = new PromisePool(5);
const urls = Array.from({ length: 100 }, (_, i) =>
https://api.example.com/item/${i}
);
const results = await pool.map(urls, async (url) => { const response = await fetch(url); return response.json(); });
- Mutex and Semaphore (TypeScript) class Mutex { private locked = false; private queue: Array<() => void> = [];
async acquire(): Promise
return new Promise(resolve => {
this.queue.push(resolve);
});
}
release(): void { if (this.queue.length > 0) { const resolve = this.queue.shift()!; resolve(); } else { this.locked = false; } }
async runExclusive
class Semaphore { private available: number; private queue: Array<() => void> = [];
constructor(private max: number) { this.available = max; }
async acquire(): Promise
return new Promise(resolve => {
this.queue.push(resolve);
});
}
release(): void { if (this.queue.length > 0) { const resolve = this.queue.shift()!; resolve(); } else { this.available++; } }
async runExclusive
// Usage const mutex = new Mutex(); let counter = 0;
async function incrementCounter() { await mutex.runExclusive(async () => { const current = counter; await new Promise(resolve => setTimeout(resolve, 10)); counter = current + 1; }); }
// Database connection pool with semaphore const dbSemaphore = new Semaphore(10); // Max 10 concurrent connections
async function queryDatabase(query: string) { return dbSemaphore.runExclusive(async () => { // Execute query return executeQuery(query); }); }
async function executeQuery(query: string) { // Query logic }
- Worker Pool (Node.js) import { Worker } from 'worker_threads';
interface Task
class WorkerPool {
private workers: Worker[] = [];
private availableWorkers: Worker[] = [];
private taskQueue: Task
constructor( private workerScript: string, private poolSize: number ) { this.initializeWorkers(); }
private initializeWorkers(): void { for (let i = 0; i < this.poolSize; i++) { const worker = new Worker(this.workerScript);
worker.on('message', (result) => {
this.handleWorkerMessage(worker, result);
});
worker.on('error', (error) => {
console.error('Worker error:', error);
});
this.workers.push(worker);
this.availableWorkers.push(worker);
}
}
async execute
this.taskQueue.push(task);
this.processQueue();
});
}
private processQueue(): void { while (this.taskQueue.length > 0 && this.availableWorkers.length > 0) { const task = this.taskQueue.shift()!; const worker = this.availableWorkers.shift()!;
worker.postMessage({
taskId: task.id,
data: task.data
});
(worker as any).currentTask = task;
}
}
private handleWorkerMessage(worker: Worker, result: any): void {
const task = (worker as any).currentTask as Task
if (!task) return;
if (result.error) {
task.reject(new Error(result.error));
} else {
task.resolve(result.data);
}
delete (worker as any).currentTask;
this.availableWorkers.push(worker);
this.processQueue();
}
async terminate(): Promise
// worker.js // const { parentPort } = require('worker_threads'); // // parentPort.on('message', async ({ taskId, data }) => { // try { // const result = await processData(data); // parentPort.postMessage({ taskId, data: result }); // } catch (error) { // parentPort.postMessage({ taskId, error: error.message }); // } // });
- Python Threading Patterns import threading from queue import Queue from typing import Callable, List, TypeVar, Generic import time
T = TypeVar('T') R = TypeVar('R')
class ThreadPool(Generic[T, R]): def init(self, num_threads: int): self.num_threads = num_threads self.tasks: Queue = Queue() self.results: List[R] = [] self.lock = threading.Lock() self.workers: List[threading.Thread] = []
def map(self, func: Callable[[T], R], items: List[T]) -> List[R]:
"""Map function over items using thread pool."""
# Add tasks to queue
for item in items:
self.tasks.put(item)
# Start workers
for _ in range(self.num_threads):
worker = threading.Thread(
target=self._worker,
args=(func,)
)
worker.start()
self.workers.append(worker)
# Wait for completion
self.tasks.join()
# Stop workers
for _ in range(self.num_threads):
self.tasks.put(None)
for worker in self.workers:
worker.join()
return self.results
def _worker(self, func: Callable[[T], R]):
"""Worker thread."""
while True:
item = self.tasks.get()
if item is None:
self.tasks.task_done()
break
try:
result = func(item)
with self.lock:
self.results.append(result)
finally:
self.tasks.task_done()
class Mutex: def init(self): self._lock = threading.Lock()
def __enter__(self):
self._lock.acquire()
return self
def __exit__(self, *args):
self._lock.release()
class Semaphore: def init(self, max_count: int): self._semaphore = threading.Semaphore(max_count)
def __enter__(self):
self._semaphore.acquire()
return self
def __exit__(self, *args):
self._semaphore.release()
Usage
def process_item(item: int) -> int: time.sleep(0.1) return item * 2
pool = ThreadPool(num_threads=5) items = list(range(100)) results = pool.map(process_item, items) print(f"Processed {len(results)} items")
Mutex example
counter = 0 mutex = Mutex()
def increment(): global counter with mutex: current = counter time.sleep(0.001) counter = current + 1
threads = [threading.Thread(target=increment) for _ in range(100)] for t in threads: t.start() for t in threads: t.join()
print(f"Counter: {counter}") # Should be 100
Semaphore example
db_connections = Semaphore(max_count=10)
def query_database(query: str): with db_connections: # Execute query with limited connections time.sleep(0.1) print(f"Executing: {query}")
- Async Patterns (Python asyncio) import asyncio from typing import Callable, List, TypeVar, Awaitable
T = TypeVar('T') R = TypeVar('R')
class AsyncPool: def init(self, concurrency: int): self.semaphore = asyncio.Semaphore(concurrency)
async def map(
self,
func: Callable[[T], Awaitable[R]],
items: List[T]
) -> List[R]:
"""Map async function over items with concurrency limit."""
async def bounded_func(item: T) -> R:
async with self.semaphore:
return await func(item)
return await asyncio.gather(*[
bounded_func(item) for item in items
])
class AsyncQueue: def init(self, max_size: int = 0): self.queue = asyncio.Queue(maxsize=max_size)
async def put(self, item):
await self.queue.put(item)
async def get(self):
return await self.queue.get()
def task_done(self):
self.queue.task_done()
async def join(self):
await self.queue.join()
Producer-Consumer pattern
async def producer(queue: AsyncQueue, items: List[int]): """Produce items.""" for item in items: await queue.put(item) print(f"Produced: {item}") await asyncio.sleep(0.1)
async def consumer(queue: AsyncQueue, name: str): """Consume items.""" while True: item = await queue.get()
if item is None:
queue.task_done()
break
print(f"{name} consuming: {item}")
await asyncio.sleep(0.2)
queue.task_done()
async def main(): queue = AsyncQueue(max_size=10)
# Start consumers
consumers = [
asyncio.create_task(consumer(queue, f"Consumer-{i}"))
for i in range(3)
]
# Start producer
await producer(queue, list(range(20)))
# Wait for all items to be processed
await queue.join()
# Stop consumers
for _ in range(3):
await queue.put(None)
await asyncio.gather(*consumers)
asyncio.run(main())
- Go-Style Channels (Simulation)
class Channel
{ private buffer: T[] = []; private senders: Array<{ value: T; resolve: () => void }> = []; private receivers: Array<(value: T) => void> = []; private closed = false;
constructor(private bufferSize: number = 0) {}
async send(value: T): Promise
if (this.receivers.length > 0) {
const receiver = this.receivers.shift()!;
receiver(value);
return;
}
if (this.buffer.length < this.bufferSize) {
this.buffer.push(value);
return;
}
return new Promise(resolve => {
this.senders.push({ value, resolve });
});
}
async receive(): Promise
if (this.senders.length > 0) {
const sender = this.senders.shift()!;
this.buffer.push(sender.value);
sender.resolve();
}
return value;
}
if (this.senders.length > 0) {
const sender = this.senders.shift()!;
sender.resolve();
return sender.value;
}
if (this.closed) {
return undefined;
}
return new Promise(resolve => {
this.receivers.push(resolve);
});
}
close(): void { this.closed = true; this.receivers.forEach(receiver => receiver(undefined as any)); this.receivers = []; } }
// Usage
async function example() {
const channel = new Channel
// Producer
async function producer() {
for (let i = 0; i < 10; i++) {
await channel.send(i);
console.log(Sent: ${i});
}
channel.close();
}
// Consumer
async function consumer() {
while (true) {
const value = await channel.receive();
if (value === undefined) break;
console.log(Received: ${value});
}
}
await Promise.all([ producer(), consumer() ]); }
Best Practices ✅ DO Use proper synchronization primitives Limit concurrency to avoid resource exhaustion Handle errors in concurrent operations Use immutable data when possible Test concurrent code thoroughly Profile concurrent performance Document thread-safety guarantees ❌ DON'T Share mutable state without synchronization Use sleep/polling for coordination Create unlimited threads/workers Ignore race conditions Block event loops in async code Forget to clean up resources Resources Node.js Worker Threads Python Threading Python asyncio