Correlation & Distributed Tracing Overview
Implement correlation IDs and distributed tracing to track requests across multiple services and understand system behavior.
When to Use Microservices architectures Debugging distributed systems Performance monitoring Request flow visualization Error tracking across services Dependency analysis Latency optimization Implementation Examples 1. Correlation ID Middleware (Express) import express from 'express'; import { v4 as uuidv4 } from 'uuid';
// Async local storage for context import { AsyncLocalStorage } from 'async_hooks';
const traceContext = new AsyncLocalStorage
interface TraceContext { traceId: string; spanId: string; parentSpanId?: string; serviceName: string; }
function correlationMiddleware(serviceName: string) { return ( req: express.Request, res: express.Response, next: express.NextFunction ) => { // Extract or generate trace ID const traceId = req.headers['x-trace-id'] as string || uuidv4(); const parentSpanId = req.headers['x-span-id'] as string; const spanId = uuidv4();
// Set context
const context = new Map<string, any>();
context.set('traceId', traceId);
context.set('spanId', spanId);
context.set('parentSpanId', parentSpanId);
context.set('serviceName', serviceName);
// Inject trace headers
res.setHeader('X-Trace-Id', traceId);
res.setHeader('X-Span-Id', spanId);
// Run in context
traceContext.run(context, () => {
next();
});
}; }
// Helper to get current context function getTraceContext(): TraceContext | null { const context = traceContext.getStore(); if (!context) return null;
return { traceId: context.get('traceId'), spanId: context.get('spanId'), parentSpanId: context.get('parentSpanId'), serviceName: context.get('serviceName') }; }
// Enhanced logger with trace context class TracedLogger { log(level: string, message: string, data?: any): void { const context = getTraceContext();
const logEntry = {
level,
message,
...data,
...context,
timestamp: new Date().toISOString()
};
console.log(JSON.stringify(logEntry));
}
info(message: string, data?: any): void { this.log('info', message, data); }
error(message: string, data?: any): void { this.log('error', message, data); }
warn(message: string, data?: any): void { this.log('warn', message, data); } }
const logger = new TracedLogger();
// HTTP client with trace propagation
async function tracedFetch(
url: string,
options: RequestInit = {}
): Promise
const headers = new Headers(options.headers);
if (context) { headers.set('X-Trace-Id', context.traceId); headers.set('X-Span-Id', context.spanId); headers.set('X-Parent-Span-Id', context.spanId); }
const startTime = Date.now();
try { const response = await fetch(url, { ...options, headers });
const duration = Date.now() - startTime;
logger.info('HTTP request completed', {
method: options.method || 'GET',
url,
statusCode: response.status,
duration
});
return response;
} catch (error) { const duration = Date.now() - startTime;
logger.error('HTTP request failed', {
method: options.method || 'GET',
url,
error: (error as Error).message,
duration
});
throw error;
} }
// Usage const app = express();
app.use(correlationMiddleware('api-service'));
app.get('/api/users/:id', async (req, res) => { logger.info('Fetching user', { userId: req.params.id });
// Call another service with trace propagation
const response = await tracedFetch(
http://user-service/users/${req.params.id}
);
const data = await response.json();
logger.info('User fetched successfully');
res.json(data); });
- OpenTelemetry Integration import { NodeSDK } from '@opentelemetry/sdk-node'; import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node'; import { JaegerExporter } from '@opentelemetry/exporter-jaeger'; import { Resource } from '@opentelemetry/resources'; import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
// Configure OpenTelemetry const sdk = new NodeSDK({ resource: new Resource({ [SemanticResourceAttributes.SERVICE_NAME]: 'my-service', [SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0', }), traceExporter: new JaegerExporter({ endpoint: 'http://localhost:14268/api/traces', }), instrumentations: [ getNodeAutoInstrumentations({ '@opentelemetry/instrumentation-http': { enabled: true, }, '@opentelemetry/instrumentation-express': { enabled: true, }, '@opentelemetry/instrumentation-pg': { enabled: true, }, }), ], });
sdk.start();
// Custom spans import { trace, SpanStatusCode } from '@opentelemetry/api';
const tracer = trace.getTracer('my-service');
async function processOrder(orderId: string) { const span = tracer.startSpan('process_order');
span.setAttribute('order.id', orderId);
try { // Validate order const validateSpan = tracer.startSpan('validate_order', { parent: span, });
await validateOrder(orderId);
validateSpan.setStatus({ code: SpanStatusCode.OK });
validateSpan.end();
// Process payment
const paymentSpan = tracer.startSpan('process_payment', {
parent: span,
});
await processPayment(orderId);
paymentSpan.setStatus({ code: SpanStatusCode.OK });
paymentSpan.end();
span.setStatus({ code: SpanStatusCode.OK });
} catch (error) { span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message, }); span.recordException(error as Error); throw error; } finally { span.end(); } }
async function validateOrder(orderId: string) { // Validation logic }
async function processPayment(orderId: string) { // Payment logic }
- Python Distributed Tracing from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.jaeger.thrift import JaegerExporter from opentelemetry.instrumentation.flask import FlaskInstrumentor from opentelemetry.instrumentation.requests import RequestsInstrumentor from opentelemetry.sdk.resources import Resource from flask import Flask, request import requests import uuid
Setup tracing
resource = Resource.create({"service.name": "python-service"}) trace.set_tracer_provider(TracerProvider(resource=resource))
jaeger_exporter = JaegerExporter( agent_host_name="localhost", agent_port=6831, )
trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(jaeger_exporter) )
Auto-instrument Flask and requests
app = Flask(name) FlaskInstrumentor().instrument_app(app) RequestsInstrumentor().instrument()
tracer = trace.get_tracer(name)
@app.route('/api/orders/
with tracer.start_as_current_span("fetch_order_details") as span:
span.set_attribute("order.id", order_id)
# Fetch from database
with tracer.start_as_current_span("database_query"):
order = fetch_order_from_db(order_id)
# Call another service (automatically traced)
with tracer.start_as_current_span("fetch_user_details"):
user = requests.get(
f"http://user-service/users/{order['user_id']}"
).json()
return {
"order": order,
"user": user
}
def fetch_order_from_db(order_id): # Database logic return {"id": order_id, "user_id": "user123"}
if name == 'main': app.run(port=5000)
- Manual Trace Propagation
interface Span {
traceId: string;
spanId: string;
parentSpanId?: string;
name: string;
serviceName: string;
startTime: number;
endTime?: number;
duration?: number;
tags: Record
; logs: Array<{ timestamp: number; message: string; fields?: any }>; status: 'ok' | 'error'; }
class DistributedTracer { private spans: Span[] = [];
startSpan( name: string, parentSpanId?: string ): Span { const context = getTraceContext();
const span: Span = {
traceId: context?.traceId || uuidv4(),
spanId: uuidv4(),
parentSpanId: parentSpanId || context?.parentSpanId,
name,
serviceName: context?.serviceName || 'unknown',
startTime: Date.now(),
tags: {},
logs: [],
status: 'ok'
};
this.spans.push(span);
return span;
}
endSpan(span: Span): void { span.endTime = Date.now(); span.duration = span.endTime - span.startTime;
// Send to tracing backend
this.reportSpan(span);
}
setTag(span: Span, key: string, value: any): void { span.tags[key] = value; }
logEvent(span: Span, message: string, fields?: any): void { span.logs.push({ timestamp: Date.now(), message, fields }); }
setError(span: Span, error: Error): void { span.status = 'error'; span.tags['error'] = true; span.tags['error.message'] = error.message; span.tags['error.stack'] = error.stack; }
private async reportSpan(span: Span): Promise
// In production:
// await fetch('http://tracing-collector/api/spans', {
// method: 'POST',
// headers: { 'Content-Type': 'application/json' },
// body: JSON.stringify(span)
// });
}
getAllSpans(): Span[] { return this.spans; }
getTrace(traceId: string): Span[] { return this.spans.filter(s => s.traceId === traceId); } }
const tracer = new DistributedTracer();
// Usage async function handleRequest() { const span = tracer.startSpan('handle_request');
tracer.setTag(span, 'http.method', 'GET'); tracer.setTag(span, 'http.url', '/api/users/123');
try { // Database operation const dbSpan = tracer.startSpan('database_query', span.spanId); tracer.setTag(dbSpan, 'db.type', 'postgresql'); tracer.setTag(dbSpan, 'db.statement', 'SELECT * FROM users WHERE id = $1');
await queryDatabase();
tracer.endSpan(dbSpan);
// External API call
const apiSpan = tracer.startSpan('external_api_call', span.spanId);
tracer.setTag(apiSpan, 'http.url', 'https://api.example.com/data');
await callExternalAPI();
tracer.endSpan(apiSpan);
tracer.logEvent(span, 'Request completed successfully');
tracer.endSpan(span);
} catch (error) { tracer.setError(span, error as Error); tracer.logEvent(span, 'Request failed', { error: (error as Error).message }); tracer.endSpan(span); throw error; } }
async function queryDatabase() { await new Promise(resolve => setTimeout(resolve, 100)); }
async function callExternalAPI() { await new Promise(resolve => setTimeout(resolve, 200)); }
Best Practices ✅ DO Generate trace IDs at entry points Propagate trace context across services Include correlation IDs in logs Use structured logging Set appropriate span attributes Sample traces in high-traffic systems Monitor trace collection overhead Implement context propagation ❌ DON'T Skip trace propagation Log without correlation context Create too many spans Store sensitive data in spans Block on trace reporting Forget error tracking Trace Headers X-Trace-Id: trace identifier X-Span-Id: current span X-Parent-Span-Id: parent span X-Sampled: sampling decision
Resources OpenTelemetry Jaeger Tracing Zipkin