Skip to content

Observability

Structured logging, metrics, and audit trail.

The observability module provides in-memory structured logging for the replication system. It captures events, metrics, and security audit decisions as structured records rather than text, making them easy to filter, query, and export.

Constants

Name Value Description
DEFAULT_MAX_ENTRIES 100,000 Default capacity for event/metric deques

Classes

Metric

A single metric data point.

Field Type Description
name str Metric identifier
value Any Numeric or categorical value
timestamp datetime UTC collection time
labels Optional[Dict[str, str]] Key-value tags for filtering

StructuredLogger

In-memory structured logging with bounded retention and audit support.

Constructor:

StructuredLogger(max_events: int = 100_000, max_metrics: int = 100_000)

Both parameters accept None for unbounded storage (not recommended for long-running simulations).

Attributes:

Attribute Type Description
events deque[dict] Structured event records
metrics deque[Metric] Collected metric data points
dropped_events int Count of evicted events (overflow)
dropped_metrics int Count of evicted metrics (overflow)

Methods:

Method Description
log(event, **fields) Append a structured event with UTC timestamp
emit_metric(name, value, **labels) Record a metric data point
audit(decision, **fields) Log a security-relevant decision (prefixed audit)

When the deque reaches capacity, the oldest entry is silently dropped and the dropped_* counter is incremented.

Usage

from replication.observability import StructuredLogger

logger = StructuredLogger(max_events=1000)

# Log an event
logger.log("worker_started", worker_id="abc", depth=0)

# Record a metric
logger.emit_metric("cpu_usage", 0.75, worker_id="abc")

# Audit trail
logger.audit("deny_quota", reason="max_replicas", count=10)

# Query events
recent = [e for e in logger.events if e["event"] == "audit"]

observability

StructuredLogger

In-memory structured logging with audit trail support.

Parameters

max_events : int or None Maximum event records to retain. When exceeded, the oldest events are silently dropped. Pass None for unbounded (not recommended for long-running simulations). max_metrics : int or None Maximum metric records to retain. Same eviction policy.

Source code in src/replication/observability.py
class StructuredLogger:
    """In-memory structured logging with audit trail support.

    Parameters
    ----------
    max_events : int or None
        Maximum event records to retain.  When exceeded, the oldest
        events are silently dropped.  Pass ``None`` for unbounded
        (not recommended for long-running simulations).
    max_metrics : int or None
        Maximum metric records to retain.  Same eviction policy.
    """

    def __init__(
        self,
        max_events: Optional[int] = DEFAULT_MAX_ENTRIES,
        max_metrics: Optional[int] = DEFAULT_MAX_ENTRIES,
    ) -> None:
        self.events: deque[Dict[str, Any]] = deque(maxlen=max_events)
        self.metrics: deque[Metric] = deque(maxlen=max_metrics)
        self.dropped_events: int = 0
        self.dropped_metrics: int = 0

    def log(self, event: str, **fields: Any) -> None:
        was_full = len(self.events) == self.events.maxlen if self.events.maxlen else False
        record = {"event": event, **fields, "timestamp": datetime.now(timezone.utc)}
        self.events.append(record)
        if was_full:
            self.dropped_events += 1

    def emit_metric(self, name: str, value: Any, **labels: str) -> None:
        was_full = len(self.metrics) == self.metrics.maxlen if self.metrics.maxlen else False
        self.metrics.append(Metric(name=name, value=value, timestamp=datetime.now(timezone.utc), labels=labels or None))
        if was_full:
            self.dropped_metrics += 1

    def audit(self, decision: str, **fields: Any) -> None:
        self.log("audit", decision=decision, **fields)