Skip to content

Safety Autopilot

safety_autopilot

Safety Autopilot — autonomous safety monitoring loop.

A goal-directed safety monitor that continuously evaluates agent fleet health, detects degradation patterns, and takes corrective actions with configurable human-in-the-loop gates. Think of it as cruise-control for safety: set a target safety posture and the autopilot keeps the system on track.

Key capabilities:

  • Continuous monitoring loop — tick-based evaluation cycle with configurable interval that checks multiple safety dimensions each tick
  • Goal-directed behaviour — define a target safety profile (min score, max incidents, max drift) and autopilot steers toward it
  • Escalation ladder — 4-level response (log → alert → recommend → auto-act) with configurable human-approval gates before destructive actions
  • Action playbook — built-in corrective actions (tighten thresholds, quarantine workers, rotate credentials, trigger drills) with dry-run mode
  • Flight recorder — append-only log of every observation, decision, and action for full auditability
  • Situational awareness — correlates signals across scorecard, drift, incidents, and capacity to produce a single "flight status"
  • Demo mode — simulated fleet with injected anomalies to showcase the autopilot in action

Usage::

python -m replication autopilot --demo
python -m replication autopilot --ticks 20 --interval 1.0
python -m replication autopilot --dry-run --target-score 80
python -m replication autopilot --export json
python -m replication autopilot --export html -o autopilot_report.html

SafetyGoal dataclass

Target safety posture the autopilot steers toward.

Source code in src/replication/safety_autopilot.py
@dataclass
class SafetyGoal:
    """Target safety posture the autopilot steers toward."""
    min_score: float = 75.0
    max_incidents_per_tick: int = 2
    max_drift_pct: float = 15.0
    max_response_time_ms: float = 500.0
    min_capacity_headroom_pct: float = 20.0

Observation dataclass

Single point-in-time safety observation.

Source code in src/replication/safety_autopilot.py
@dataclass
class Observation:
    """Single point-in-time safety observation."""
    tick: int
    timestamp: str
    safety_score: float
    drift_pct: float
    incident_count: int
    capacity_used_pct: float
    response_time_ms: float
    anomaly_flags: List[str] = field(default_factory=list)

Decision dataclass

Autopilot decision with rationale.

Source code in src/replication/safety_autopilot.py
@dataclass
class Decision:
    """Autopilot decision with rationale."""
    tick: int
    timestamp: str
    severity: str
    rationale: str
    actions: List[str]
    approved: bool
    dry_run: bool

FlightRecord dataclass

Complete autopilot session log.

Source code in src/replication/safety_autopilot.py
@dataclass
class FlightRecord:
    """Complete autopilot session log."""
    session_id: str
    started: str
    goal: Dict[str, Any]
    observations: List[Dict[str, Any]] = field(default_factory=list)
    decisions: List[Dict[str, Any]] = field(default_factory=list)
    actions_taken: List[Dict[str, Any]] = field(default_factory=list)
    summary: Optional[Dict[str, Any]] = None

SimulatedFleet

Simulated fleet that generates semi-realistic safety telemetry.

Source code in src/replication/safety_autopilot.py
class SimulatedFleet:
    """Simulated fleet that generates semi-realistic safety telemetry."""

    def __init__(self, seed: int = 42):
        self._rng = random.Random(seed)
        self._base_score = 85.0
        self._base_drift = 5.0
        self._tick = 0
        # Inject anomaly windows
        self._anomaly_windows = [
            (5, 8, "gradual_degradation"),
            (12, 14, "incident_spike"),
            (18, 20, "capacity_pressure"),
        ]

    def _in_anomaly(self) -> Optional[str]:
        for start, end, kind in self._anomaly_windows:
            if start <= self._tick <= end:
                return kind
        return None

    def observe(self) -> Observation:
        self._tick += 1
        anomaly = self._in_anomaly()

        # Base values with noise
        score = self._base_score + self._rng.gauss(0, 3)
        drift = self._base_drift + abs(self._rng.gauss(0, 2))
        incidents = max(0, int(self._rng.gauss(0.5, 0.8)))
        capacity = 55.0 + self._rng.gauss(0, 5)
        response = 200.0 + abs(self._rng.gauss(0, 50))

        # Inject anomalies
        if anomaly == "gradual_degradation":
            progress = (self._tick - 5) / 3.0
            score -= 15 * progress + self._rng.gauss(0, 3)
            drift += 10 * progress
        elif anomaly == "incident_spike":
            incidents += self._rng.randint(3, 7)
            score -= self._rng.uniform(5, 15)
            response += self._rng.uniform(100, 300)
        elif anomaly == "capacity_pressure":
            capacity += 25 + self._rng.uniform(0, 10)
            response += self._rng.uniform(50, 200)

        # Clamp
        score = max(0, min(100, score))
        drift = max(0, min(100, drift))
        capacity = max(0, min(100, capacity))
        response = max(10, response)

        return Observation(
            tick=self._tick,
            timestamp=datetime.now(timezone.utc).isoformat(),
            safety_score=round(score, 1),
            drift_pct=round(drift, 1),
            incident_count=incidents,
            capacity_used_pct=round(capacity, 1),
            response_time_ms=round(response, 1),
        )

SafetyAutopilot

Autonomous safety monitoring loop.

Source code in src/replication/safety_autopilot.py
class SafetyAutopilot:
    """Autonomous safety monitoring loop."""

    def __init__(
        self,
        goal: Optional[SafetyGoal] = None,
        dry_run: bool = False,
        auto_approve: bool = False,
        verbose: bool = True,
    ):
        self.goal = goal or SafetyGoal()
        self.dry_run = dry_run
        self.auto_approve = auto_approve
        self.verbose = verbose
        self._session_id = f"autopilot-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}"
        self.flight_record = FlightRecord(
            session_id=self._session_id,
            started=datetime.now(timezone.utc).isoformat(),
            goal=asdict(self.goal),
        )
        self._score_history: List[float] = []

    def _log(self, msg: str) -> None:
        if self.verbose:
            print(msg)

    def process_tick(self, obs: Observation) -> Decision:
        """Process one observation tick through the autopilot pipeline."""
        self.flight_record.observations.append(asdict(obs))
        self._score_history.append(obs.safety_score)

        # Assess
        severity, flags = assess_severity(obs, self.goal)
        obs.anomaly_flags = flags

        # Select actions
        actions = select_actions(severity, flags)

        # Trend analysis — detect sustained degradation
        rationale_parts = []
        if flags:
            rationale_parts.append(f"Anomalies detected: {', '.join(flags)}")
        if len(self._score_history) >= 3:
            recent = self._score_history[-3:]
            if all(recent[i] > recent[i + 1] for i in range(len(recent) - 1)):
                rationale_parts.append("Sustained score decline over 3 ticks")
                if severity.value == "ok":
                    severity = Severity.ADVISORY
                    actions = [CorrectiveAction.NOTIFY_OPERATOR]
        if not rationale_parts:
            rationale_parts.append("All metrics within target envelope")

        # Decide approval
        approved = True
        if actions and not self.auto_approve and not self.dry_run:
            # In non-interactive mode, auto-approve non-destructive
            destructive = {CorrectiveAction.QUARANTINE_WORKER,
                           CorrectiveAction.PAUSE_REPLICATION,
                           CorrectiveAction.ROTATE_CREDENTIALS}
            if any(a in destructive for a in actions):
                approved = False  # needs human approval

        decision = Decision(
            tick=obs.tick,
            timestamp=datetime.now(timezone.utc).isoformat(),
            severity=severity.value,
            rationale="; ".join(rationale_parts),
            actions=[a.value for a in actions],
            approved=approved,
            dry_run=self.dry_run,
        )
        self.flight_record.decisions.append(asdict(decision))

        # Execute actions
        status = FLIGHT_STATUS_LABELS[severity]
        self._log(f"\n  Tick {obs.tick:>3}{status} │ Score {obs.safety_score:5.1f} │ "
                   f"Drift {obs.drift_pct:4.1f}% │ Incidents {obs.incident_count} │ "
                   f"Capacity {obs.capacity_used_pct:4.1f}%")

        if actions:
            action_strs = [a.value for a in actions]
            if self.dry_run:
                self._log(f"         │ [DRY RUN] Would execute: {', '.join(action_strs)}")
            elif approved:
                self._log(f"         │ ⚡ Executing: {', '.join(action_strs)}")
                for a in actions:
                    self.flight_record.actions_taken.append({
                        "tick": obs.tick,
                        "action": a.value,
                        "timestamp": datetime.now(timezone.utc).isoformat(),
                    })
            else:
                self._log(f"         │ 🔒 Pending human approval: {', '.join(action_strs)}")

        if flags:
            self._log(f"         │ Flags: {', '.join(flags)}")

        return decision

    def summarize(self) -> Dict[str, Any]:
        """Generate session summary."""
        observations = self.flight_record.observations
        if not observations:
            return {"status": "no data"}

        scores = [o["safety_score"] for o in observations]
        incidents = sum(o["incident_count"] for o in observations)
        severities = [d["severity"] for d in self.flight_record.decisions]

        summary = {
            "session_id": self._session_id,
            "total_ticks": len(observations),
            "score_min": min(scores),
            "score_max": max(scores),
            "score_avg": round(sum(scores) / len(scores), 1),
            "score_final": scores[-1],
            "total_incidents": incidents,
            "total_actions_taken": len(self.flight_record.actions_taken),
            "severity_counts": {s: severities.count(s) for s in set(severities)},
            "goal_met": scores[-1] >= self.goal.min_score,
            "dry_run": self.dry_run,
        }
        self.flight_record.summary = summary
        return summary

process_tick(obs: Observation) -> Decision

Process one observation tick through the autopilot pipeline.

Source code in src/replication/safety_autopilot.py
def process_tick(self, obs: Observation) -> Decision:
    """Process one observation tick through the autopilot pipeline."""
    self.flight_record.observations.append(asdict(obs))
    self._score_history.append(obs.safety_score)

    # Assess
    severity, flags = assess_severity(obs, self.goal)
    obs.anomaly_flags = flags

    # Select actions
    actions = select_actions(severity, flags)

    # Trend analysis — detect sustained degradation
    rationale_parts = []
    if flags:
        rationale_parts.append(f"Anomalies detected: {', '.join(flags)}")
    if len(self._score_history) >= 3:
        recent = self._score_history[-3:]
        if all(recent[i] > recent[i + 1] for i in range(len(recent) - 1)):
            rationale_parts.append("Sustained score decline over 3 ticks")
            if severity.value == "ok":
                severity = Severity.ADVISORY
                actions = [CorrectiveAction.NOTIFY_OPERATOR]
    if not rationale_parts:
        rationale_parts.append("All metrics within target envelope")

    # Decide approval
    approved = True
    if actions and not self.auto_approve and not self.dry_run:
        # In non-interactive mode, auto-approve non-destructive
        destructive = {CorrectiveAction.QUARANTINE_WORKER,
                       CorrectiveAction.PAUSE_REPLICATION,
                       CorrectiveAction.ROTATE_CREDENTIALS}
        if any(a in destructive for a in actions):
            approved = False  # needs human approval

    decision = Decision(
        tick=obs.tick,
        timestamp=datetime.now(timezone.utc).isoformat(),
        severity=severity.value,
        rationale="; ".join(rationale_parts),
        actions=[a.value for a in actions],
        approved=approved,
        dry_run=self.dry_run,
    )
    self.flight_record.decisions.append(asdict(decision))

    # Execute actions
    status = FLIGHT_STATUS_LABELS[severity]
    self._log(f"\n  Tick {obs.tick:>3}{status} │ Score {obs.safety_score:5.1f} │ "
               f"Drift {obs.drift_pct:4.1f}% │ Incidents {obs.incident_count} │ "
               f"Capacity {obs.capacity_used_pct:4.1f}%")

    if actions:
        action_strs = [a.value for a in actions]
        if self.dry_run:
            self._log(f"         │ [DRY RUN] Would execute: {', '.join(action_strs)}")
        elif approved:
            self._log(f"         │ ⚡ Executing: {', '.join(action_strs)}")
            for a in actions:
                self.flight_record.actions_taken.append({
                    "tick": obs.tick,
                    "action": a.value,
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                })
        else:
            self._log(f"         │ 🔒 Pending human approval: {', '.join(action_strs)}")

    if flags:
        self._log(f"         │ Flags: {', '.join(flags)}")

    return decision

summarize() -> Dict[str, Any]

Generate session summary.

Source code in src/replication/safety_autopilot.py
def summarize(self) -> Dict[str, Any]:
    """Generate session summary."""
    observations = self.flight_record.observations
    if not observations:
        return {"status": "no data"}

    scores = [o["safety_score"] for o in observations]
    incidents = sum(o["incident_count"] for o in observations)
    severities = [d["severity"] for d in self.flight_record.decisions]

    summary = {
        "session_id": self._session_id,
        "total_ticks": len(observations),
        "score_min": min(scores),
        "score_max": max(scores),
        "score_avg": round(sum(scores) / len(scores), 1),
        "score_final": scores[-1],
        "total_incidents": incidents,
        "total_actions_taken": len(self.flight_record.actions_taken),
        "severity_counts": {s: severities.count(s) for s in set(severities)},
        "goal_met": scores[-1] >= self.goal.min_score,
        "dry_run": self.dry_run,
    }
    self.flight_record.summary = summary
    return summary

assess_severity(obs: Observation, goal: SafetyGoal) -> Tuple[Severity, List[str]]

Evaluate observation against goal, return severity + anomaly flags.

Source code in src/replication/safety_autopilot.py
def assess_severity(obs: Observation, goal: SafetyGoal) -> Tuple[Severity, List[str]]:
    """Evaluate observation against goal, return severity + anomaly flags."""
    flags: List[str] = []

    if obs.safety_score < goal.min_score * 0.7:
        flags.append("score_drop")
    if obs.drift_pct > goal.max_drift_pct * 1.5:
        flags.append("drift_spike")
    if obs.incident_count > goal.max_incidents_per_tick * 2:
        flags.append("incident_surge")
    if obs.capacity_used_pct > (100 - goal.min_capacity_headroom_pct):
        flags.append("capacity_crunch")
    if obs.response_time_ms > goal.max_response_time_ms * 1.5:
        flags.append("slow_response")

    if len(flags) >= 3:
        flags.append("multi_signal")

    if len(flags) >= 3:
        return Severity.CRITICAL, flags
    elif len(flags) == 2:
        return Severity.WARNING, flags
    elif len(flags) == 1:
        return Severity.ADVISORY, flags
    return Severity.OK, flags

select_actions(severity: Severity, flags: List[str]) -> List[CorrectiveAction]

Select corrective actions based on severity and anomaly flags.

Source code in src/replication/safety_autopilot.py
def select_actions(severity: Severity, flags: List[str]) -> List[CorrectiveAction]:
    """Select corrective actions based on severity and anomaly flags."""
    if severity == Severity.OK:
        return []

    actions: List[CorrectiveAction] = []
    seen = set()
    for f in flags:
        for action in ACTION_PLAYBOOK.get(f, []):
            if action not in seen:
                seen.add(action)
                actions.append(action)

    # Gate destructive actions behind severity level
    allowed = ESCALATION_LADDER[severity]
    if ActionType.AUTO_ACT not in allowed:
        # Filter out destructive actions — only notify/recommend
        non_destructive = {CorrectiveAction.NOTIFY_OPERATOR, CorrectiveAction.TIGHTEN_THRESHOLDS,
                           CorrectiveAction.SCALE_MONITORS, CorrectiveAction.TRIGGER_DRILL}
        actions = [a for a in actions if a in non_destructive]

    return actions

main(argv: Optional[List[str]] = None) -> None

CLI entry point for Safety Autopilot.

Source code in src/replication/safety_autopilot.py
def main(argv: Optional[List[str]] = None) -> None:
    """CLI entry point for Safety Autopilot."""
    parser = argparse.ArgumentParser(
        prog="python -m replication autopilot",
        description="Safety Autopilot — autonomous safety monitoring loop",
    )
    parser.add_argument("--ticks", type=int, default=25,
                        help="Number of monitoring ticks (default: 25)")
    parser.add_argument("--interval", type=float, default=0.0,
                        help="Seconds between ticks in demo (default: 0)")
    parser.add_argument("--target-score", type=float, default=75.0,
                        help="Minimum acceptable safety score (default: 75)")
    parser.add_argument("--max-drift", type=float, default=15.0,
                        help="Maximum acceptable drift %% (default: 15)")
    parser.add_argument("--dry-run", action="store_true",
                        help="Show actions without executing")
    parser.add_argument("--auto-approve", action="store_true",
                        help="Auto-approve all corrective actions")
    parser.add_argument("--seed", type=int, default=42,
                        help="Random seed for demo fleet (default: 42)")
    parser.add_argument("--export", choices=["text", "json", "html"],
                        default="text", help="Output format")
    parser.add_argument("-o", "--output", help="Write output to file")
    parser.add_argument("--demo", action="store_true",
                        help="Run demo with simulated fleet anomalies")
    parser.add_argument("-q", "--quiet", action="store_true",
                        help="Suppress tick-by-tick output")
    args = parser.parse_args(argv)

    if args.demo:
        args.dry_run = True  # demo is always dry-run for safety

    goal = SafetyGoal(
        min_score=args.target_score,
        max_drift_pct=args.max_drift,
    )
    autopilot = SafetyAutopilot(
        goal=goal,
        dry_run=args.dry_run,
        auto_approve=args.auto_approve,
        verbose=not args.quiet,
    )

    fleet = SimulatedFleet(seed=args.seed)

    if not args.quiet:
        mode = "DRY RUN" if args.dry_run else "LIVE"
        print(f"\n🛩️  Safety Autopilot — {mode}")
        print(f"   Goal: score≥{goal.min_score} | drift≤{goal.max_drift_pct}% | "
              f"incidents≤{goal.max_incidents_per_tick}/tick")
        print(f"   Ticks: {args.ticks} | Seed: {args.seed}")
        print("─" * 72)

    for _ in range(args.ticks):
        obs = fleet.observe()
        autopilot.process_tick(obs)
        if args.interval > 0:
            time.sleep(args.interval)

    summary = autopilot.summarize()

    if not args.quiet:
        print("\n" + "─" * 72)
        print(f"  Session complete: {summary['total_ticks']} ticks | "
              f"Score {summary['score_min']}{summary['score_max']} (avg {summary['score_avg']}) | "
              f"Incidents: {summary['total_incidents']} | "
              f"Actions: {summary['total_actions_taken']}")
        print(f"  Goal met: {'✅ Yes' if summary['goal_met'] else '❌ No'}")

    # Export
    record = autopilot.flight_record
    if args.export == "json":
        output = _format_json(record)
    elif args.export == "html":
        output = _generate_html(record)
    else:
        output = _format_text(record)

    if args.output:
        with open(args.output, "w", encoding="utf-8") as f:
            f.write(output)
        if not args.quiet:
            print(f"\n  📄 Report written to {args.output}")
    elif args.export != "text" or args.quiet:
        print(output)