logging-observability

安装量: 55
排名: #13500

安装

npx skills add https://github.com/cosmix/loom --skill logging-observability

Logging and Observability Overview

Observability enables understanding system behavior through logs, metrics, and traces. This skill provides patterns for:

Structured Logging: JSON logs with correlation IDs and contextual data Distributed Tracing: Span-based request tracking across services (OpenTelemetry, Jaeger, Zipkin) Metrics Collection: Counters, gauges, histograms for system health (Prometheus patterns) Log Aggregation: Centralized log management (ELK, Loki, Datadog) Alerting: Symptom-based alerts with runbooks Instructions 1. Structured Logging (JSON Logs) Python Implementation import json import logging import sys from datetime import datetime from contextvars import ContextVar from typing import Any

Context variables for request tracking

correlation_id: ContextVar[str] = ContextVar('correlation_id', default='') span_id: ContextVar[str] = ContextVar('span_id', default='')

class StructuredFormatter(logging.Formatter): """JSON formatter for structured logging."""

def format(self, record: logging.LogRecord) -> str:
    log_data = {
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "level": record.levelname,
        "logger": record.name,
        "message": record.getMessage(),
        "correlation_id": correlation_id.get(),
        "span_id": span_id.get(),
    }

    # Add exception info if present
    if record.exc_info:
        log_data["exception"] = self.formatException(record.exc_info)

    # Add extra fields
    if hasattr(record, 'structured_data'):
        log_data.update(record.structured_data)

    return json.dumps(log_data)

def setup_logging(): """Configure structured logging.""" handler = logging.StreamHandler(sys.stdout) handler.setFormatter(StructuredFormatter())

root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)

Usage

logger = logging.getLogger(name) logger.info("User logged in", extra={ "structured_data": { "user_id": "123", "ip_address": "192.168.1.1", "action": "login" } })

TypeScript Implementation interface LogContext { correlationId?: string; spanId?: string;

}

interface LogEntry { timestamp: string; level: string; message: string; context: LogContext; }

class StructuredLogger { private context: LogContext = {};

withContext(context: LogContext): StructuredLogger { const child = new StructuredLogger(); child.context = { ...this.context, ...context }; return child; }

private log( level: string, message: string, data?: Record, ): void { const entry: LogEntry = { timestamp: new Date().toISOString(), level, message, context: { ...this.context, ...data }, }; console.log(JSON.stringify(entry)); }

debug(message: string, data?: Record): void { this.log("DEBUG", message, data); }

info(message: string, data?: Record): void { this.log("INFO", message, data); }

warn(message: string, data?: Record): void { this.log("WARN", message, data); }

error(message: string, data?: Record): void { this.log("ERROR", message, data); } }

  1. Log Levels and When to Use Each Level Usage Examples TRACE Fine-grained debugging Loop iterations, variable values DEBUG Diagnostic information Function entry/exit, intermediate states INFO Normal operations Request started, job completed, user action WARN Potential issues Deprecated API usage, retry attempted, slow query ERROR Failures requiring attention Exception caught, operation failed FATAL Critical failures System cannot continue, data corruption

Log level usage examples

logger.debug("Processing item", extra={"structured_data": {"item_id": item.id}}) logger.info("Order processed successfully", extra={"structured_data": {"order_id": order.id, "total": order.total}}) logger.warning("Rate limit approaching", extra={"structured_data": {"current": 95, "limit": 100}}) logger.error("Payment failed", extra={"structured_data": {"order_id": order.id, "error": str(e)}})

  1. Distributed Tracing Correlation IDs and Spans import uuid from contextvars import ContextVar from dataclasses import dataclass, field from typing import Optional import time

@dataclass class Span: name: str trace_id: str span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:16]) parent_span_id: Optional[str] = None start_time: float = field(default_factory=time.time) end_time: Optional[float] = None attributes: dict = field(default_factory=dict)

def end(self):
    self.end_time = time.time()

@property
def duration_ms(self) -> float:
    if self.end_time:
        return (self.end_time - self.start_time) * 1000
    return 0

current_span: ContextVar[Optional[Span]] = ContextVar('current_span', default=None)

class Tracer: def init(self, service_name: str): self.service_name = service_name

def start_span(self, name: str, parent: Optional[Span] = None) -> Span:
    parent = parent or current_span.get()
    trace_id = parent.trace_id if parent else str(uuid.uuid4())[:32]
    parent_span_id = parent.span_id if parent else None

    span = Span(
        name=name,
        trace_id=trace_id,
        parent_span_id=parent_span_id,
        attributes={"service": self.service_name}
    )
    current_span.set(span)
    return span

def end_span(self, span: Span):
    span.end()
    self._export(span)
    # Restore parent span if exists
    # In production, use a span stack

def _export(self, span: Span):
    """Export span to tracing backend."""
    logger.info(f"Span completed: {span.name}", extra={
        "structured_data": {
            "trace_id": span.trace_id,
            "span_id": span.span_id,
            "parent_span_id": span.parent_span_id,
            "duration_ms": span.duration_ms,
            "attributes": span.attributes
        }
    })

Context manager for spans

from contextlib import contextmanager

@contextmanager def trace_span(tracer: Tracer, name: str): span = tracer.start_span(name) try: yield span except Exception as e: span.attributes["error"] = True span.attributes["error.message"] = str(e) raise finally: tracer.end_span(span)

Usage

tracer = Tracer("order-service")

async def process_order(order_id: str): with trace_span(tracer, "process_order") as span: span.attributes["order_id"] = order_id

    with trace_span(tracer, "validate_order"):
        await validate(order_id)

    with trace_span(tracer, "charge_payment"):
        await charge(order_id)
  1. Metrics Collection from dataclasses import dataclass from typing import Dict, List from enum import Enum import time import threading

class MetricType(Enum): COUNTER = "counter" GAUGE = "gauge" HISTOGRAM = "histogram"

@dataclass class Counter: name: str labels: Dict[str, str] value: float = 0

def inc(self, amount: float = 1):
    self.value += amount

@dataclass class Gauge: name: str labels: Dict[str, str] value: float = 0

def set(self, value: float):
    self.value = value

def inc(self, amount: float = 1):
    self.value += amount

def dec(self, amount: float = 1):
    self.value -= amount

@dataclass class Histogram: name: str labels: Dict[str, str] buckets: List[float] values: List[float] = None

def __post_init__(self):
    self.values = []
    self._bucket_counts = {b: 0 for b in self.buckets}
    self._bucket_counts[float('inf')] = 0
    self._sum = 0
    self._count = 0

def observe(self, value: float):
    self.values.append(value)
    self._sum += value
    self._count += 1
    for bucket in sorted(self._bucket_counts.keys()):
        if value <= bucket:
            self._bucket_counts[bucket] += 1

class MetricsRegistry: def init(self): self._metrics: Dict[str, any] = {} self._lock = threading.Lock()

def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
    key = f"{name}:{labels}"
    with self._lock:
        if key not in self._metrics:
            self._metrics[key] = Counter(name, labels or {})
        return self._metrics[key]

def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
    key = f"{name}:{labels}"
    with self._lock:
        if key not in self._metrics:
            self._metrics[key] = Gauge(name, labels or {})
        return self._metrics[key]

def histogram(self, name: str, buckets: List[float], labels: Dict[str, str] = None) -> Histogram:
    key = f"{name}:{labels}"
    with self._lock:
        if key not in self._metrics:
            self._metrics[key] = Histogram(name, labels or {}, buckets)
        return self._metrics[key]

Usage

metrics = MetricsRegistry()

Counter for requests

request_counter = metrics.counter("http_requests_total", {"method": "GET", "path": "/api/orders"}) request_counter.inc()

Gauge for active connections

active_connections = metrics.gauge("active_connections") active_connections.inc()

... handle connection ...

active_connections.dec()

Histogram for request duration

request_duration = metrics.histogram( "http_request_duration_seconds", buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0] )

start = time.time()

... handle request ...

request_duration.observe(time.time() - start)

  1. OpenTelemetry Patterns from opentelemetry import trace, metrics from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter from opentelemetry.instrumentation.requests import RequestsInstrumentor from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

def setup_opentelemetry(service_name: str, otlp_endpoint: str): """Initialize OpenTelemetry with OTLP export."""

# Tracing setup
trace_provider = TracerProvider(
    resource=Resource.create({"service.name": service_name})
)
trace_provider.add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
)
trace.set_tracer_provider(trace_provider)

# Metrics setup
metric_provider = MeterProvider(
    resource=Resource.create({"service.name": service_name})
)
metrics.set_meter_provider(metric_provider)

# Auto-instrumentation
RequestsInstrumentor().instrument()

return trace.get_tracer(service_name), metrics.get_meter(service_name)

Usage with FastAPI

from fastapi import FastAPI

app = FastAPI() FastAPIInstrumentor.instrument_app(app)

tracer, meter = setup_opentelemetry("order-service", "http://otel-collector:4317")

Custom spans

@app.get("/orders/{order_id}") async def get_order(order_id: str): with tracer.start_as_current_span("fetch_order") as span: span.set_attribute("order.id", order_id) order = await order_repository.get(order_id) span.set_attribute("order.status", order.status) return order

  1. Log Aggregation Patterns ELK Stack (Elasticsearch, Logstash, Kibana)

Logstash pipeline configuration

input { file { path => "/var/log/app/*.log" codec => json } }

filter { # Parse structured JSON logs json { source => "message" }

# Add Elasticsearch index based on date mutate { add_field => { "[@metadata][index]" => "app-logs-%{+YYYY.MM.dd}" } }

# Enrich with geolocation (if IP present) geoip { source => "ip_address" target => "geo" } }

output { elasticsearch { hosts => ["elasticsearch:9200"] index => "%{[@metadata][index]}" } }

Grafana Loki

Promtail scrape configuration

scrape_configs: - job_name: app-logs static_configs: - targets: - localhost labels: job: app-logs path: /var/log/app/*.log

# Extract JSON fields as labels
pipeline_stages:
  - json:
      expressions:
        level: level
        correlation_id: correlation_id
        service: service
  - labels:
      level:
      correlation_id:
      service:

Datadog Agent Configuration

datadog.yaml

logs_enabled: true

logs_config: processing_rules: - type: exclude_at_match name: exclude_healthcheck pattern: "GET /health"

# Auto-parse JSON logs auto_multi_line_detection: true

Log collection from files

logs: - type: file path: "/var/log/app/*.log" service: "order-service" source: "python" tags: - "env:production"

  1. Alert Design Prometheus Alerting Rules

Prometheus alerting rules

groups: - name: service-alerts rules: # High error rate alert - alert: HighErrorRate expr: | sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.05 for: 5m labels: severity: critical annotations: summary: "High error rate detected" description: "Error rate is {{ $value | humanizePercentage }} over the last 5 minutes" runbook_url: "https://wiki.example.com/runbooks/high-error-rate"

  # High latency alert
  - alert: HighLatency
    expr: |
      histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "High latency detected"
      description: "95th percentile latency is {{ $value }}s"

  # Service down alert
  - alert: ServiceDown
    expr: up == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Service {{ $labels.instance }} is down"
      description: "{{ $labels.job }} has been down for more than 1 minute"

Alert Severity Levels Level Response Time Examples Critical Immediate Service down, high error rate, data loss Warning Business hrs High latency, approaching limits, retry spikes Info Log only Deployment started, config changed Best Practices Logging

Log at Appropriate Levels: DEBUG for development, INFO for normal operations, WARN for potential issues, ERROR for failures, FATAL for critical failures.

Include Context: Always include correlation IDs, trace IDs, user IDs, and relevant business identifiers in structured fields.

Avoid Sensitive Data: Never log passwords, tokens, credit cards, or PII. Implement automatic redaction when necessary.

Use Structured Logging: JSON logs enable easy parsing and querying in log aggregation systems (ELK, Loki, Datadog).

Consistent Field Names: Standardize field names across services (e.g., always use correlation_id, not sometimes request_id).

Distributed Tracing

Trace Boundaries: Create spans at service boundaries, database calls, external API calls, and significant operations.

Propagate Context: Pass trace IDs and span IDs across service boundaries via HTTP headers (OpenTelemetry standards).

Add Meaningful Attributes: Include business context (user_id, order_id) and technical context (db_query, cache_hit) in span attributes.

Sample Appropriately: Use adaptive sampling - trace 100% of errors, sample successful requests based on traffic volume.

Metrics

Track Golden Signals: Monitor the Four Golden Signals - latency, traffic, errors, saturation.

Use Correct Metric Types: Counters for totals (requests), Gauges for current values (memory), Histograms for distributions (latency).

Label Cardinality: Keep label cardinality low - avoid high-cardinality values like user IDs in metric labels.

Naming Conventions: Follow Prometheus naming - http_requests_total (counter), process_memory_bytes (gauge), http_request_duration_seconds (histogram).

Alerting

Alert on Symptoms: Alert on user-impacting issues (error rate, latency), not causes (CPU usage). Symptoms indicate what is broken, causes explain why.

Include Runbooks: Every alert must link to a runbook with investigation steps, common causes, and remediation procedures.

Use Appropriate Thresholds: Set thresholds based on SLOs and historical data, not arbitrary values.

Alert Fatigue: Ensure alerts are actionable. Non-actionable alerts lead to alert fatigue and ignored critical issues.

Integration

End-to-End Correlation: Link logs, traces, and metrics using correlation IDs to enable cross-system debugging.

Centralize: Use centralized log aggregation (ELK, Loki) and trace collection (Jaeger, Zipkin) for cross-service visibility.

Test Observability: Verify logging, tracing, and metrics in development - don't discover gaps in production.

Examples Complete Request Logging Middleware import time import uuid from fastapi import FastAPI, Request from starlette.middleware.base import BaseHTTPMiddleware

class ObservabilityMiddleware(BaseHTTPMiddleware): def init(self, app, tracer, metrics): super().init(app) self.tracer = tracer self.request_counter = metrics.counter("http_requests_total") self.request_duration = metrics.histogram( "http_request_duration_seconds", buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0] )

async def dispatch(self, request: Request, call_next):
    # Extract or generate correlation ID
    corr_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
    correlation_id.set(corr_id)

    start_time = time.time()

    with self.tracer.start_as_current_span(
        f"{request.method} {request.url.path}"
    ) as span:
        span.set_attribute("http.method", request.method)
        span.set_attribute("http.url", str(request.url))
        span.set_attribute("correlation_id", corr_id)

        try:
            response = await call_next(request)

            span.set_attribute("http.status_code", response.status_code)

            # Record metrics
            labels = {
                "method": request.method,
                "path": request.url.path,
                "status": str(response.status_code)
            }
            self.request_counter.labels(**labels).inc()
            self.request_duration.labels(**labels).observe(
                time.time() - start_time
            )

            # Add correlation ID to response
            response.headers["X-Correlation-ID"] = corr_id

            return response

        except Exception as e:
            span.set_attribute("error", True)
            span.record_exception(e)
            raise
返回排行榜