Logging and Observability Overview
Observability enables understanding system behavior through logs, metrics, and traces. This skill provides patterns for:
Structured Logging: JSON logs with correlation IDs and contextual data Distributed Tracing: Span-based request tracking across services (OpenTelemetry, Jaeger, Zipkin) Metrics Collection: Counters, gauges, histograms for system health (Prometheus patterns) Log Aggregation: Centralized log management (ELK, Loki, Datadog) Alerting: Symptom-based alerts with runbooks Instructions 1. Structured Logging (JSON Logs) Python Implementation import json import logging import sys from datetime import datetime from contextvars import ContextVar from typing import Any
Context variables for request tracking
correlation_id: ContextVar[str] = ContextVar('correlation_id', default='') span_id: ContextVar[str] = ContextVar('span_id', default='')
class StructuredFormatter(logging.Formatter): """JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"correlation_id": correlation_id.get(),
"span_id": span_id.get(),
}
# Add exception info if present
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# Add extra fields
if hasattr(record, 'structured_data'):
log_data.update(record.structured_data)
return json.dumps(log_data)
def setup_logging(): """Configure structured logging.""" handler = logging.StreamHandler(sys.stdout) handler.setFormatter(StructuredFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)
Usage
logger = logging.getLogger(name) logger.info("User logged in", extra={ "structured_data": { "user_id": "123", "ip_address": "192.168.1.1", "action": "login" } })
TypeScript Implementation interface LogContext { correlationId?: string; spanId?: string;
}
interface LogEntry { timestamp: string; level: string; message: string; context: LogContext; }
class StructuredLogger { private context: LogContext = {};
withContext(context: LogContext): StructuredLogger { const child = new StructuredLogger(); child.context = { ...this.context, ...context }; return child; }
private log(
level: string,
message: string,
data?: Record
debug(message: string, data?: Record
info(message: string, data?: Record
warn(message: string, data?: Record
error(message: string, data?: Record
- Log Levels and When to Use Each Level Usage Examples TRACE Fine-grained debugging Loop iterations, variable values DEBUG Diagnostic information Function entry/exit, intermediate states INFO Normal operations Request started, job completed, user action WARN Potential issues Deprecated API usage, retry attempted, slow query ERROR Failures requiring attention Exception caught, operation failed FATAL Critical failures System cannot continue, data corruption
Log level usage examples
logger.debug("Processing item", extra={"structured_data": {"item_id": item.id}}) logger.info("Order processed successfully", extra={"structured_data": {"order_id": order.id, "total": order.total}}) logger.warning("Rate limit approaching", extra={"structured_data": {"current": 95, "limit": 100}}) logger.error("Payment failed", extra={"structured_data": {"order_id": order.id, "error": str(e)}})
- Distributed Tracing Correlation IDs and Spans import uuid from contextvars import ContextVar from dataclasses import dataclass, field from typing import Optional import time
@dataclass class Span: name: str trace_id: str span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:16]) parent_span_id: Optional[str] = None start_time: float = field(default_factory=time.time) end_time: Optional[float] = None attributes: dict = field(default_factory=dict)
def end(self):
self.end_time = time.time()
@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0
current_span: ContextVar[Optional[Span]] = ContextVar('current_span', default=None)
class Tracer: def init(self, service_name: str): self.service_name = service_name
def start_span(self, name: str, parent: Optional[Span] = None) -> Span:
parent = parent or current_span.get()
trace_id = parent.trace_id if parent else str(uuid.uuid4())[:32]
parent_span_id = parent.span_id if parent else None
span = Span(
name=name,
trace_id=trace_id,
parent_span_id=parent_span_id,
attributes={"service": self.service_name}
)
current_span.set(span)
return span
def end_span(self, span: Span):
span.end()
self._export(span)
# Restore parent span if exists
# In production, use a span stack
def _export(self, span: Span):
"""Export span to tracing backend."""
logger.info(f"Span completed: {span.name}", extra={
"structured_data": {
"trace_id": span.trace_id,
"span_id": span.span_id,
"parent_span_id": span.parent_span_id,
"duration_ms": span.duration_ms,
"attributes": span.attributes
}
})
Context manager for spans
from contextlib import contextmanager
@contextmanager def trace_span(tracer: Tracer, name: str): span = tracer.start_span(name) try: yield span except Exception as e: span.attributes["error"] = True span.attributes["error.message"] = str(e) raise finally: tracer.end_span(span)
Usage
tracer = Tracer("order-service")
async def process_order(order_id: str): with trace_span(tracer, "process_order") as span: span.attributes["order_id"] = order_id
with trace_span(tracer, "validate_order"):
await validate(order_id)
with trace_span(tracer, "charge_payment"):
await charge(order_id)
- Metrics Collection from dataclasses import dataclass from typing import Dict, List from enum import Enum import time import threading
class MetricType(Enum): COUNTER = "counter" GAUGE = "gauge" HISTOGRAM = "histogram"
@dataclass class Counter: name: str labels: Dict[str, str] value: float = 0
def inc(self, amount: float = 1):
self.value += amount
@dataclass class Gauge: name: str labels: Dict[str, str] value: float = 0
def set(self, value: float):
self.value = value
def inc(self, amount: float = 1):
self.value += amount
def dec(self, amount: float = 1):
self.value -= amount
@dataclass class Histogram: name: str labels: Dict[str, str] buckets: List[float] values: List[float] = None
def __post_init__(self):
self.values = []
self._bucket_counts = {b: 0 for b in self.buckets}
self._bucket_counts[float('inf')] = 0
self._sum = 0
self._count = 0
def observe(self, value: float):
self.values.append(value)
self._sum += value
self._count += 1
for bucket in sorted(self._bucket_counts.keys()):
if value <= bucket:
self._bucket_counts[bucket] += 1
class MetricsRegistry: def init(self): self._metrics: Dict[str, any] = {} self._lock = threading.Lock()
def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Counter(name, labels or {})
return self._metrics[key]
def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Gauge(name, labels or {})
return self._metrics[key]
def histogram(self, name: str, buckets: List[float], labels: Dict[str, str] = None) -> Histogram:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Histogram(name, labels or {}, buckets)
return self._metrics[key]
Usage
metrics = MetricsRegistry()
Counter for requests
request_counter = metrics.counter("http_requests_total", {"method": "GET", "path": "/api/orders"}) request_counter.inc()
Gauge for active connections
active_connections = metrics.gauge("active_connections") active_connections.inc()
... handle connection ...
active_connections.dec()
Histogram for request duration
request_duration = metrics.histogram( "http_request_duration_seconds", buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0] )
start = time.time()
... handle request ...
request_duration.observe(time.time() - start)
- OpenTelemetry Patterns from opentelemetry import trace, metrics from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter from opentelemetry.instrumentation.requests import RequestsInstrumentor from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
def setup_opentelemetry(service_name: str, otlp_endpoint: str): """Initialize OpenTelemetry with OTLP export."""
# Tracing setup
trace_provider = TracerProvider(
resource=Resource.create({"service.name": service_name})
)
trace_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
)
trace.set_tracer_provider(trace_provider)
# Metrics setup
metric_provider = MeterProvider(
resource=Resource.create({"service.name": service_name})
)
metrics.set_meter_provider(metric_provider)
# Auto-instrumentation
RequestsInstrumentor().instrument()
return trace.get_tracer(service_name), metrics.get_meter(service_name)
Usage with FastAPI
from fastapi import FastAPI
app = FastAPI() FastAPIInstrumentor.instrument_app(app)
tracer, meter = setup_opentelemetry("order-service", "http://otel-collector:4317")
Custom spans
@app.get("/orders/{order_id}") async def get_order(order_id: str): with tracer.start_as_current_span("fetch_order") as span: span.set_attribute("order.id", order_id) order = await order_repository.get(order_id) span.set_attribute("order.status", order.status) return order
- Log Aggregation Patterns ELK Stack (Elasticsearch, Logstash, Kibana)
Logstash pipeline configuration
input { file { path => "/var/log/app/*.log" codec => json } }
filter { # Parse structured JSON logs json { source => "message" }
# Add Elasticsearch index based on date mutate { add_field => { "[@metadata][index]" => "app-logs-%{+YYYY.MM.dd}" } }
# Enrich with geolocation (if IP present) geoip { source => "ip_address" target => "geo" } }
output { elasticsearch { hosts => ["elasticsearch:9200"] index => "%{[@metadata][index]}" } }
Grafana Loki
Promtail scrape configuration
scrape_configs: - job_name: app-logs static_configs: - targets: - localhost labels: job: app-logs path: /var/log/app/*.log
# Extract JSON fields as labels
pipeline_stages:
- json:
expressions:
level: level
correlation_id: correlation_id
service: service
- labels:
level:
correlation_id:
service:
Datadog Agent Configuration
datadog.yaml
logs_enabled: true
logs_config: processing_rules: - type: exclude_at_match name: exclude_healthcheck pattern: "GET /health"
# Auto-parse JSON logs auto_multi_line_detection: true
Log collection from files
logs: - type: file path: "/var/log/app/*.log" service: "order-service" source: "python" tags: - "env:production"
- Alert Design Prometheus Alerting Rules
Prometheus alerting rules
groups: - name: service-alerts rules: # High error rate alert - alert: HighErrorRate expr: | sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.05 for: 5m labels: severity: critical annotations: summary: "High error rate detected" description: "Error rate is {{ $value | humanizePercentage }} over the last 5 minutes" runbook_url: "https://wiki.example.com/runbooks/high-error-rate"
# High latency alert
- alert: HighLatency
expr: |
histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1
for: 10m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "95th percentile latency is {{ $value }}s"
# Service down alert
- alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Service {{ $labels.instance }} is down"
description: "{{ $labels.job }} has been down for more than 1 minute"
Alert Severity Levels Level Response Time Examples Critical Immediate Service down, high error rate, data loss Warning Business hrs High latency, approaching limits, retry spikes Info Log only Deployment started, config changed Best Practices Logging
Log at Appropriate Levels: DEBUG for development, INFO for normal operations, WARN for potential issues, ERROR for failures, FATAL for critical failures.
Include Context: Always include correlation IDs, trace IDs, user IDs, and relevant business identifiers in structured fields.
Avoid Sensitive Data: Never log passwords, tokens, credit cards, or PII. Implement automatic redaction when necessary.
Use Structured Logging: JSON logs enable easy parsing and querying in log aggregation systems (ELK, Loki, Datadog).
Consistent Field Names: Standardize field names across services (e.g., always use correlation_id, not sometimes request_id).
Distributed Tracing
Trace Boundaries: Create spans at service boundaries, database calls, external API calls, and significant operations.
Propagate Context: Pass trace IDs and span IDs across service boundaries via HTTP headers (OpenTelemetry standards).
Add Meaningful Attributes: Include business context (user_id, order_id) and technical context (db_query, cache_hit) in span attributes.
Sample Appropriately: Use adaptive sampling - trace 100% of errors, sample successful requests based on traffic volume.
Metrics
Track Golden Signals: Monitor the Four Golden Signals - latency, traffic, errors, saturation.
Use Correct Metric Types: Counters for totals (requests), Gauges for current values (memory), Histograms for distributions (latency).
Label Cardinality: Keep label cardinality low - avoid high-cardinality values like user IDs in metric labels.
Naming Conventions: Follow Prometheus naming - http_requests_total (counter), process_memory_bytes (gauge), http_request_duration_seconds (histogram).
Alerting
Alert on Symptoms: Alert on user-impacting issues (error rate, latency), not causes (CPU usage). Symptoms indicate what is broken, causes explain why.
Include Runbooks: Every alert must link to a runbook with investigation steps, common causes, and remediation procedures.
Use Appropriate Thresholds: Set thresholds based on SLOs and historical data, not arbitrary values.
Alert Fatigue: Ensure alerts are actionable. Non-actionable alerts lead to alert fatigue and ignored critical issues.
Integration
End-to-End Correlation: Link logs, traces, and metrics using correlation IDs to enable cross-system debugging.
Centralize: Use centralized log aggregation (ELK, Loki) and trace collection (Jaeger, Zipkin) for cross-service visibility.
Test Observability: Verify logging, tracing, and metrics in development - don't discover gaps in production.
Examples Complete Request Logging Middleware import time import uuid from fastapi import FastAPI, Request from starlette.middleware.base import BaseHTTPMiddleware
class ObservabilityMiddleware(BaseHTTPMiddleware): def init(self, app, tracer, metrics): super().init(app) self.tracer = tracer self.request_counter = metrics.counter("http_requests_total") self.request_duration = metrics.histogram( "http_request_duration_seconds", buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0] )
async def dispatch(self, request: Request, call_next):
# Extract or generate correlation ID
corr_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
correlation_id.set(corr_id)
start_time = time.time()
with self.tracer.start_as_current_span(
f"{request.method} {request.url.path}"
) as span:
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", str(request.url))
span.set_attribute("correlation_id", corr_id)
try:
response = await call_next(request)
span.set_attribute("http.status_code", response.status_code)
# Record metrics
labels = {
"method": request.method,
"path": request.url.path,
"status": str(response.status_code)
}
self.request_counter.labels(**labels).inc()
self.request_duration.labels(**labels).observe(
time.time() - start_time
)
# Add correlation ID to response
response.headers["X-Correlation-ID"] = corr_id
return response
except Exception as e:
span.set_attribute("error", True)
span.record_exception(e)
raise