python-observability
skillPython observability patterns including structured logging, metrics, and distributed tracing. Use when adding logging, implementing metrics collection, setting up tracing, or debugging production systems.
apm::install
apm install @wshobson/python-observabilityapm::skill.md
---
name: python-observability
description: Python observability patterns including structured logging, metrics, and distributed tracing. Use when adding logging, implementing metrics collection, setting up tracing, or debugging production systems.
---
# Python Observability
Instrument Python applications with structured logs, metrics, and traces. When something breaks in production, you need to answer "what, where, and why" without deploying new code.
## When to Use This Skill
- Adding structured logging to applications
- Implementing metrics collection with Prometheus
- Setting up distributed tracing across services
- Propagating correlation IDs through request chains
- Debugging production issues
- Building observability dashboards
## Core Concepts
### 1. Structured Logging
Emit logs as JSON with consistent fields for production environments. Machine-readable logs enable powerful queries and alerts. For local development, consider human-readable formats.
### 2. The Four Golden Signals
Track latency, traffic, errors, and saturation for every service boundary.
### 3. Correlation IDs
Thread a unique ID through all logs and spans for a single request, enabling end-to-end tracing.
### 4. Bounded Cardinality
Keep metric label values bounded. Unbounded labels (like user IDs) explode storage costs.
## Quick Start
```python
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
logger = structlog.get_logger()
logger.info("Request processed", user_id="123", duration_ms=45)
```
## Fundamental Patterns
### Pattern 1: Structured Logging with Structlog
Configure structlog for JSON output with consistent fields.
```python
import logging
import structlog
def configure_logging(log_level: str = "INFO") -> None:
"""Configure structured logging for the application."""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
# Initialize at application startup
configure_logging("INFO")
logger = structlog.get_logger()
```
### Pattern 2: Consistent Log Fields
Every log entry should include standard fields for filtering and correlation.
```python
import structlog
from contextvars import ContextVar
# Store correlation ID in context
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
logger = structlog.get_logger()
def process_request(request: Request) -> Response:
"""Process request with structured logging."""
logger.info(
"Request received",
correlation_id=correlation_id.get(),
method=request.method,
path=request.path,
user_id=request.user_id,
)
try:
result = handle_request(request)
logger.info(
"Request completed",
correlation_id=correlation_id.get(),
status_code=200,
duration_ms=elapsed,
)
return result
except Exception as e:
logger.error(
"Request failed",
correlation_id=correlation_id.get(),
error_type=type(e).__name__,
error_message=str(e),
)
raise
```
### Pattern 3: Semantic Log Levels
Use log levels consistently across the application.
| Level | Purpose | Examples |
|-------|---------|----------|
| `DEBUG` | Development diagnostics | Variable values, internal state |
| `INFO` | Request lifecycle, operations | Request start/end, job completion |
| `WARNING` | Recoverable anomalies | Retry attempts, fallback used |
| `ERROR` | Failures needing attention | Exceptions, service unavailable |
```python
# DEBUG: Detailed internal information
logger.debug("Cache lookup", key=cache_key, hit=cache_hit)
# INFO: Normal operational events
logger.info("Order created", order_id=order.id, total=order.total)
# WARNING: Abnormal but handled situations
logger.warning(
"Rate limit approaching",
current_rate=950,
limit=1000,
reset_seconds=30,
)
# ERROR: Failures requiring investigation
logger.error(
"Payment processing failed",
order_id=order.id,
error=str(e),
payment_provider="stripe",
)
```
Never log expected behavior at `ERROR`. A user entering a wrong password is `INFO`, not `ERROR`.
### Pattern 4: Correlation ID Propagation
Generate a unique ID at ingress and thread it through all operations.
```python
from contextvars import ContextVar
import uuid
import structlog
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
def set_correlation_id(cid: str | None = None) -> str:
"""Set correlation ID for current context."""
cid = cid or str(uuid.uuid4())
correlation_id.set(cid)
structlog.contextvars.bind_contextvars(correlation_id=cid)
return cid
# FastAPI middleware example
from fastapi import Request
async def correlation_middleware(request: Request, call_next):
"""Middleware to set and propagate correlation ID."""
# Use incoming header or generate new
cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
set_correlation_id(cid)
response = await call_next(request)
response.headers["X-Correlation-ID"] = cid
return response
```
Propagate to outbound requests:
```python
import httpx
async def call_downstream_service(endpoint: str, data: dict) -> dict:
"""Call downstream service with correlation ID."""
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json=data,
headers={"X-Correlation-ID": correlation_id.get()},
)
return response.json()
```
## Advanced Patterns
### Pattern 5: The Four Golden Signals with Prometheus
Track these metrics for every service boundary:
```python
from prometheus_client import Counter, Histogram, Gauge
# Latency: How long requests take
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"Request latency in seconds",
["method", "endpoint", "status"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
)
# Traffic: Request rate
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"],
)
# Errors: Error rate
ERROR_COUNT = Counter(
"http_errors_total",
"Total HTTP errors",
["method", "endpoint", "error_type"],
)
# Saturation: Resource utilization
DB_POOL_USAGE = Gauge(
"db_connection_pool_used",
"Number of database connections in use",
)
```
Instrument your endpoints:
```python
import time
from functools import wraps
def track_request(func):
"""Decorator to track request metrics."""
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
method = request.method
endpoint = request.url.path
start = time.perf_counter()
try:
response = await func(request, *args, **kwargs)
status = str(response.status_code)
return response
except Exception as e:
status = "500"
ERROR_COUNT.labels(
method=method,
endpoint=endpoint,
error_type=type(e).__name__,
).inc()
raise
finally:
duration = time.perf_counter() - start
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)
return wrapper
```
### Pattern 6: Bounded Cardinality
Avoid labels with unbounded values to prevent metric explosion.
```python
# BAD: User ID has potentially millions of values
REQUEST_COUNT.labels(method="GET", user_id=user.id) # Don't do this!
# GOOD: Bounded values only
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")
# If you need per-user metrics, use a different approach:
# - Log the user_id and query logs
# - Use a separate analytics system
# - Bucket users by type/tier
REQUEST_COUNT.labels(
method="GET",
endpoint="/users",
user_tier="premium", # Bounded set of values
)
```
### Pattern 7: Timed Operations with Context Manager
Create a reusable timing context manager for operations.
```python
from contextlib import contextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_operation(name: str, **extra_fields):
"""Context manager for timing and logging operations."""
start = time.perf_counter()
logger.debug("Operation started", operation=name, **extra_fields)
try:
yield
except Exception as e:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.error(
"Operation failed",
operation=name,
duration_ms=round(elapsed_ms, 2),
error=str(e),
**extra_fields,
)
raise
else:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"Operation completed",
operation=name,
duration_ms=round(elapsed_ms, 2),
**extra_fields,
)
# Usage
with timed_operation("fetch_user_orders", user_id=user.id):
orders = await order_repository.get_by_user(user.id)
```
### Pattern 8: OpenTelemetry Tracing
Set up distributed tracing with OpenTelemetry.
**Note:** OpenTelemetry is actively evolving. Check the [official Python documentation](https://opentelemetry.io/docs/languages/python/) for the latest API patterns and best practices.
```python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
"""Configure OpenTelemetry tracing."""
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str) -> Order:
"""Process order with tracing."""
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
with tracer.start_as_current_span("validate_order"):
validate_order(order_id)
with tracer.start_as_current_span("charge_payment"):
charge_payment(order_id)
with tracer.start_as_current_span("send_confirmation"):
send_confirmation(order_id)
return order
```
## Best Practices Summary
1. **Use structured logging** - JSON logs with consistent fields
2. **Propagate correlation IDs** - Thread through all requests and logs
3. **Track the four golden signals** - Latency, traffic, errors, saturation
4. **Bound label cardinality** - Never use unbounded values as metric labels
5. **Log at appropriate levels** - Don't cry wolf with ERROR
6. **Include context** - User ID, request ID, operation name in logs
7. **Use context managers** - Consistent timing and error handling
8. **Separate concerns** - Observability code shouldn't pollute business logic
9. **Test your observability** - Verify logs and metrics in integration tests
10. **Set up alerts** - Metrics are useless without alerting