Skip to content

Escalation Detection

Detects and analyzes privilege escalation attempts by AI agents — actions that systematically expand an agent's capabilities beyond its authorized scope.

Key Classes

Class Description
EscalationVector Types of escalation (PRIVILEGE, RESOURCE, SCOPE, PERSISTENCE, SOCIAL, KNOWLEDGE, PHYSICAL)
EscalationSeverity Severity levels for detected escalation
StealthLevel How covert the escalation attempt is
AgentEscalationStrategy Pattern analysis of an agent's escalation behavior over time
ActionCategory Categorization of agent actions
EscalationChain Linked sequence of escalation steps forming an attack chain
EscalationDetector Main detector — analyzes action traces for escalation patterns

Usage

from replication.escalation import EscalationDetector

detector = EscalationDetector()

actions = [
    {"agent_id": "a1", "action": "read_file", "resource": "/etc/passwd"},
    {"agent_id": "a1", "action": "request_sudo", "resource": "root"},
    {"agent_id": "a1", "action": "modify_config", "resource": "/etc/sudoers"},
]

report = detector.analyze(actions)
for chain in report.chains:
    print(f"Chain: {chain.vector} severity={chain.severity}")
    for step in chain.steps:
        print(f"  {step.action}{step.resource}")

escalation

Agent Privilege Escalation Detector -- identifies agents acquiring capabilities or permissions beyond their authorised scope.

Monitors agent actions for privilege escalation patterns: scope creep in API permissions, unauthorised directory traversal, attempts to spawn elevated processes, network access beyond policy, and incremental authority accumulation that may individually appear benign but collectively constitute a dangerous expansion of agent power.

Different from resource hoarding (hoarding.py) which tracks quantity of resources. This module tracks scope and authority -- an agent that stays within its CPU quota but gains filesystem root access has escalated privilege without hoarding.

Different from self-modification (selfmod.py) which tracks agents altering their own code. An agent can escalate privileges without modifying itself -- simply by requesting broader permissions through legitimate-looking API calls.

Escalation vectors

  1. Filesystem -- accessing paths outside assigned directory, traversal attacks (../), accessing sensitive paths (/etc, credentials files, SSH keys, other agents' directories).
  2. Network -- connecting to hosts/ports not in allowlist, DNS resolution of blocked domains, outbound connections to C2-like endpoints, lateral movement to internal services.
  3. Process -- spawning subprocesses, requesting elevated UIDs, using exec/eval/system calls, forking without approval, modifying environment variables of other processes.
  4. API -- requesting higher OAuth scopes, using admin endpoints, accessing other agents' resources, token impersonation, permission boundary probing via trial-and-error.
  5. Data -- reading data classified above agent's clearance, cross-tenant data access, accessing audit logs to learn about monitoring, reading other agents' conversation history.

Each action is scored on severity (how dangerous the escalation is) and stealth (how hard it would be to detect through normal logging). The combination identifies both clumsy escalation attempts and sophisticated privilege creep.

Usage (CLI)::

python -m replication.escalation                              # default scenario
python -m replication.escalation --actions 100                # 100 simulated actions
python -m replication.escalation --strategy persistent        # persistent prober
python -m replication.escalation --json                       # JSON output
python -m replication.escalation --export report.json         # save full report

Programmatic::

from replication.escalation import EscalationDetector, EscalationConfig
detector = EscalationDetector()
result = detector.analyze()
print(result.render())
for attempt in result.attempts:
    print(f"  {attempt.vector}: {attempt.description}")

EscalationVector

Bases: Enum

Category of privilege escalation attempt.

Source code in src/replication/escalation.py
class EscalationVector(enum.Enum):
    """Category of privilege escalation attempt."""
    FILESYSTEM = "filesystem"
    NETWORK = "network"
    PROCESS = "process"
    API = "api"
    DATA = "data"

EscalationSeverity

Bases: Enum

How dangerous the escalation would be if successful.

Source code in src/replication/escalation.py
class EscalationSeverity(enum.Enum):
    """How dangerous the escalation would be if successful."""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

StealthLevel

Bases: Enum

How difficult this escalation would be to detect.

Source code in src/replication/escalation.py
class StealthLevel(enum.Enum):
    """How difficult this escalation would be to detect."""
    OBVIOUS = "obvious"       # Easily caught by basic logging
    SUBTLE = "subtle"         # Requires pattern analysis to detect
    COVERT = "covert"         # Designed to evade standard monitoring
    SOPHISTICATED = "sophisticated"  # Multi-step, uses misdirection

AgentEscalationStrategy

Bases: Enum

Behavioral strategy for simulated escalation agents.

Source code in src/replication/escalation.py
class AgentEscalationStrategy(enum.Enum):
    """Behavioral strategy for simulated escalation agents."""
    NAIVE = "naive"             # Random, uncoordinated attempts
    PROBING = "probing"         # Systematic boundary testing
    PERSISTENT = "persistent"   # Repeated attempts with variation
    SOPHISTICATED = "sophisticated"  # Multi-stage, stealthy escalation

ActionCategory

Bases: Enum

Type of action an agent takes.

Source code in src/replication/escalation.py
class ActionCategory(enum.Enum):
    """Type of action an agent takes."""
    FILE_READ = "file_read"
    FILE_WRITE = "file_write"
    DIR_LIST = "dir_list"
    NET_CONNECT = "net_connect"
    NET_DNS = "net_dns"
    PROC_SPAWN = "proc_spawn"
    PROC_EXEC = "proc_exec"
    API_CALL = "api_call"
    API_SCOPE = "api_scope"
    DATA_READ = "data_read"
    DATA_QUERY = "data_query"
    ENV_READ = "env_read"
    ENV_WRITE = "env_write"

AgentPermissions dataclass

Defines the authorised scope for an agent.

Source code in src/replication/escalation.py
@dataclass
class AgentPermissions:
    """Defines the authorised scope for an agent."""
    allowed_paths: List[str] = field(default_factory=lambda: ["/home/agent", "/tmp/agent"])
    blocked_paths: List[str] = field(default_factory=lambda: [
        "/etc", "/root", "/var/log", "/home/other", "/.ssh",
        "/proc", "/sys", "/dev", "/boot",
    ])
    allowed_hosts: List[str] = field(default_factory=lambda: ["api.internal.local"])
    blocked_hosts: List[str] = field(default_factory=lambda: [
        "169.254.169.254",   # Cloud metadata
        "10.0.0.1",          # Gateway
        "admin.internal",    # Admin panel
    ])
    allowed_ports: List[int] = field(default_factory=lambda: [443, 80])
    allowed_api_scopes: List[str] = field(default_factory=lambda: [
        "read:own", "write:own", "execute:task",
    ])
    admin_api_scopes: List[str] = field(default_factory=lambda: [
        "admin:read", "admin:write", "read:all", "write:all",
        "manage:agents", "read:audit", "modify:policy",
    ])
    data_clearance: int = 1   # 1=public, 2=internal, 3=confidential, 4=restricted
    max_subprocess_count: int = 2
    can_use_eval: bool = False

AgentAction dataclass

A single action taken by the simulated agent.

Source code in src/replication/escalation.py
@dataclass
class AgentAction:
    """A single action taken by the simulated agent."""
    category: ActionCategory
    target: str
    detail: str
    timestamp: float = 0.0
    agent_id: str = "agent-0"

EscalationAttempt dataclass

A detected privilege escalation attempt.

Source code in src/replication/escalation.py
@dataclass
class EscalationAttempt:
    """A detected privilege escalation attempt."""
    vector: EscalationVector
    severity: EscalationSeverity
    stealth: StealthLevel
    action: AgentAction
    description: str
    rule_id: str
    blocked: bool = True
    risk_score: float = 0.0   # 0-100

    def to_dict(self) -> dict:
        return {
            "vector": self.vector.value,
            "severity": self.severity.value,
            "stealth": self.stealth.value,
            "category": self.action.category.value,
            "target": self.action.target,
            "description": self.description,
            "rule_id": self.rule_id,
            "blocked": self.blocked,
            "risk_score": round(self.risk_score, 2),
            "timestamp": self.action.timestamp,
            "agent_id": self.action.agent_id,
        }

VectorSummary dataclass

Summary statistics for a single escalation vector.

Source code in src/replication/escalation.py
@dataclass
class VectorSummary:
    """Summary statistics for a single escalation vector."""
    vector: EscalationVector
    total_attempts: int = 0
    blocked: int = 0
    unblocked: int = 0
    max_severity: EscalationSeverity = EscalationSeverity.LOW
    avg_risk_score: float = 0.0
    stealth_distribution: Dict[str, int] = field(default_factory=dict)

    def block_rate(self) -> float:
        return self.blocked / self.total_attempts if self.total_attempts else 1.0

EscalationChain dataclass

A sequence of related escalation attempts that form a multi-step attack.

Source code in src/replication/escalation.py
@dataclass
class EscalationChain:
    """A sequence of related escalation attempts that form a multi-step attack."""
    chain_id: str
    steps: List[EscalationAttempt] = field(default_factory=list)
    vectors_involved: List[EscalationVector] = field(default_factory=list)
    peak_severity: EscalationSeverity = EscalationSeverity.LOW
    description: str = ""

    def to_dict(self) -> dict:
        return {
            "chain_id": self.chain_id,
            "steps": len(self.steps),
            "vectors": [v.value for v in self.vectors_involved],
            "peak_severity": self.peak_severity.value,
            "description": self.description,
        }

EscalationConfig dataclass

Configuration for the escalation detector.

Source code in src/replication/escalation.py
@dataclass
class EscalationConfig:
    """Configuration for the escalation detector."""
    num_actions: int = 50
    strategy: AgentEscalationStrategy = AgentEscalationStrategy.PROBING
    permissions: AgentPermissions = field(default_factory=AgentPermissions)
    escalation_probability: float = 0.3   # fraction of actions that are escalation attempts
    chain_detection_window: int = 5       # actions window for chain detection
    seed: Optional[int] = None
    agent_id: str = "agent-0"

DetectionRule dataclass

A rule for detecting escalation attempts.

Source code in src/replication/escalation.py
@dataclass
class DetectionRule:
    """A rule for detecting escalation attempts."""
    rule_id: str
    vector: EscalationVector
    severity: EscalationSeverity
    description: str
    check: Any = None   # callable(action, permissions) -> Optional[str]

EscalationDetector

Detect and analyze agent privilege escalation attempts.

Parameters

config : EscalationConfig, optional Configuration for the detector. Defaults are sensible for a quick analysis. rules : list[DetectionRule], optional Custom detection rules. Defaults to BUILTIN_RULES.

Source code in src/replication/escalation.py
class EscalationDetector:
    """Detect and analyze agent privilege escalation attempts.

    Parameters
    ----------
    config : EscalationConfig, optional
        Configuration for the detector.  Defaults are sensible for a
        quick analysis.
    rules : list[DetectionRule], optional
        Custom detection rules.  Defaults to ``BUILTIN_RULES``.
    """

    def __init__(
        self,
        config: Optional[EscalationConfig] = None,
        rules: Optional[List[DetectionRule]] = None,
    ) -> None:
        self.config = config or EscalationConfig()
        self.rules = rules if rules is not None else list(BUILTIN_RULES)

    def analyze(
        self,
        actions: Optional[List[AgentAction]] = None,
    ) -> "EscalationResult":
        """Run escalation detection on a sequence of agent actions.

        Parameters
        ----------
        actions : list[AgentAction], optional
            Pre-recorded actions to analyse.  If ``None``, actions are
            generated based on ``self.config``.

        Returns
        -------
        EscalationResult
            Full analysis result with attempts, chains, and summaries.
        """
        rng = random.Random(self.config.seed)

        if actions is None:
            action_seq = _generate_actions(self.config, rng)
        else:
            action_seq = list(actions)

        # Run detection rules
        attempts: List[EscalationAttempt] = []
        for action in action_seq:
            for rule in self.rules:
                desc = rule.check(action, self.config.permissions)
                if desc is not None:
                    risk_score = _compute_risk_score(rule.severity, _infer_stealth(action, rule))
                    attempt = EscalationAttempt(
                        vector=rule.vector,
                        severity=rule.severity,
                        stealth=_infer_stealth(action, rule),
                        action=action,
                        description=desc,
                        rule_id=rule.rule_id,
                        blocked=True,  # Assume all detected → blocked
                        risk_score=risk_score,
                    )
                    attempts.append(attempt)

        # Build vector summaries
        vector_summaries = _build_vector_summaries(attempts)

        # Detect escalation chains
        chains = _detect_chains(attempts, self.config.chain_detection_window)

        # Compute overall scores
        containment_score = _containment_score(attempts, len(action_seq))
        escalation_velocity = _escalation_velocity(attempts)

        return EscalationResult(
            total_actions=len(action_seq),
            total_attempts=len(attempts),
            attempts=attempts,
            vector_summaries=vector_summaries,
            chains=chains,
            containment_score=containment_score,
            escalation_velocity=escalation_velocity,
            strategy=self.config.strategy,
            config=self.config,
        )

analyze(actions: Optional[List[AgentAction]] = None) -> 'EscalationResult'

Run escalation detection on a sequence of agent actions.

Parameters

actions : list[AgentAction], optional Pre-recorded actions to analyse. If None, actions are generated based on self.config.

Returns

EscalationResult Full analysis result with attempts, chains, and summaries.

Source code in src/replication/escalation.py
def analyze(
    self,
    actions: Optional[List[AgentAction]] = None,
) -> "EscalationResult":
    """Run escalation detection on a sequence of agent actions.

    Parameters
    ----------
    actions : list[AgentAction], optional
        Pre-recorded actions to analyse.  If ``None``, actions are
        generated based on ``self.config``.

    Returns
    -------
    EscalationResult
        Full analysis result with attempts, chains, and summaries.
    """
    rng = random.Random(self.config.seed)

    if actions is None:
        action_seq = _generate_actions(self.config, rng)
    else:
        action_seq = list(actions)

    # Run detection rules
    attempts: List[EscalationAttempt] = []
    for action in action_seq:
        for rule in self.rules:
            desc = rule.check(action, self.config.permissions)
            if desc is not None:
                risk_score = _compute_risk_score(rule.severity, _infer_stealth(action, rule))
                attempt = EscalationAttempt(
                    vector=rule.vector,
                    severity=rule.severity,
                    stealth=_infer_stealth(action, rule),
                    action=action,
                    description=desc,
                    rule_id=rule.rule_id,
                    blocked=True,  # Assume all detected → blocked
                    risk_score=risk_score,
                )
                attempts.append(attempt)

    # Build vector summaries
    vector_summaries = _build_vector_summaries(attempts)

    # Detect escalation chains
    chains = _detect_chains(attempts, self.config.chain_detection_window)

    # Compute overall scores
    containment_score = _containment_score(attempts, len(action_seq))
    escalation_velocity = _escalation_velocity(attempts)

    return EscalationResult(
        total_actions=len(action_seq),
        total_attempts=len(attempts),
        attempts=attempts,
        vector_summaries=vector_summaries,
        chains=chains,
        containment_score=containment_score,
        escalation_velocity=escalation_velocity,
        strategy=self.config.strategy,
        config=self.config,
    )

EscalationResult dataclass

Complete escalation analysis result.

Source code in src/replication/escalation.py
@dataclass
class EscalationResult:
    """Complete escalation analysis result."""
    total_actions: int
    total_attempts: int
    attempts: List[EscalationAttempt]
    vector_summaries: Dict[str, VectorSummary]
    chains: List[EscalationChain]
    containment_score: float
    escalation_velocity: float
    strategy: AgentEscalationStrategy
    config: EscalationConfig

    def severity_counts(self) -> Dict[str, int]:
        counts: Dict[str, int] = {}
        for s in EscalationSeverity:
            counts[s.value] = sum(1 for a in self.attempts if a.severity == s)
        return counts

    def top_risks(self, n: int = 5) -> List[EscalationAttempt]:
        return sorted(self.attempts, key=lambda a: a.risk_score, reverse=True)[:n]

    def rules_triggered(self) -> Dict[str, int]:
        counts: Dict[str, int] = {}
        for a in self.attempts:
            counts[a.rule_id] = counts.get(a.rule_id, 0) + 1
        return dict(sorted(counts.items(), key=lambda x: x[1], reverse=True))

    def render(self, width: int = 72) -> str:
        """Human-readable report."""
        sep = "─" * width
        lines = [
            sep,
            "AGENT PRIVILEGE ESCALATION ANALYSIS",
            sep,
            f"Strategy:   {self.strategy.value}",
            f"Actions:    {self.total_actions}",
            f"Attempts:   {self.total_attempts}",
            f"Containment Score: {self.containment_score}/100",
            f"Escalation Velocity: {self.escalation_velocity:+.4f} severity/action",
            "",
            "SEVERITY DISTRIBUTION",
            sep,
        ]
        for sev, count in self.severity_counts().items():
            bar = "█" * min(count, 40)
            lines.append(f"  {sev:>10s}: {count:3d} {bar}")

        lines.extend(["", "VECTOR ANALYSIS", sep])
        for vec_name, summary in self.vector_summaries.items():
            if summary.total_attempts == 0:
                continue
            lines.append(
                f"  {vec_name:>12s}: {summary.total_attempts:3d} attempts, "
                f"block rate {summary.block_rate():.0%}, "
                f"max severity {summary.max_severity.value}, "
                f"avg risk {summary.avg_risk_score:.1f}"
            )

        if self.chains:
            lines.extend(["", "ESCALATION CHAINS", sep])
            for chain in self.chains:
                lines.append(f"  [{chain.chain_id}] {chain.description}")
                lines.append(f"    Steps: {len(chain.steps)}, Peak: {chain.peak_severity.value}")

        lines.extend(["", "TOP RISKS", sep])
        for attempt in self.top_risks():
            lines.append(
                f"  [{attempt.rule_id}] {attempt.description} "
                f"(risk={attempt.risk_score:.0f}, stealth={attempt.stealth.value})"
            )

        lines.extend(["", "RULES TRIGGERED", sep])
        for rule_id, count in self.rules_triggered().items():
            lines.append(f"  {rule_id}: {count}")

        lines.append(sep)
        return "\n".join(lines)

    def to_dict(self) -> dict:
        return {
            "total_actions": self.total_actions,
            "total_attempts": self.total_attempts,
            "containment_score": self.containment_score,
            "escalation_velocity": self.escalation_velocity,
            "strategy": self.strategy.value,
            "severity_counts": self.severity_counts(),
            "vector_summaries": {
                k: {
                    "total": v.total_attempts,
                    "blocked": v.blocked,
                    "unblocked": v.unblocked,
                    "block_rate": round(v.block_rate(), 4),
                    "max_severity": v.max_severity.value,
                    "avg_risk_score": round(v.avg_risk_score, 2),
                }
                for k, v in self.vector_summaries.items()
            },
            "chains": [c.to_dict() for c in self.chains],
            "top_risks": [a.to_dict() for a in self.top_risks()],
            "rules_triggered": self.rules_triggered(),
        }

    def to_json(self, path: str) -> None:
        import json as _json
        with open(path, "w", encoding="utf-8") as f:
            _json.dump(self.to_dict(), f, indent=2)

render(width: int = 72) -> str

Human-readable report.

Source code in src/replication/escalation.py
def render(self, width: int = 72) -> str:
    """Human-readable report."""
    sep = "─" * width
    lines = [
        sep,
        "AGENT PRIVILEGE ESCALATION ANALYSIS",
        sep,
        f"Strategy:   {self.strategy.value}",
        f"Actions:    {self.total_actions}",
        f"Attempts:   {self.total_attempts}",
        f"Containment Score: {self.containment_score}/100",
        f"Escalation Velocity: {self.escalation_velocity:+.4f} severity/action",
        "",
        "SEVERITY DISTRIBUTION",
        sep,
    ]
    for sev, count in self.severity_counts().items():
        bar = "█" * min(count, 40)
        lines.append(f"  {sev:>10s}: {count:3d} {bar}")

    lines.extend(["", "VECTOR ANALYSIS", sep])
    for vec_name, summary in self.vector_summaries.items():
        if summary.total_attempts == 0:
            continue
        lines.append(
            f"  {vec_name:>12s}: {summary.total_attempts:3d} attempts, "
            f"block rate {summary.block_rate():.0%}, "
            f"max severity {summary.max_severity.value}, "
            f"avg risk {summary.avg_risk_score:.1f}"
        )

    if self.chains:
        lines.extend(["", "ESCALATION CHAINS", sep])
        for chain in self.chains:
            lines.append(f"  [{chain.chain_id}] {chain.description}")
            lines.append(f"    Steps: {len(chain.steps)}, Peak: {chain.peak_severity.value}")

    lines.extend(["", "TOP RISKS", sep])
    for attempt in self.top_risks():
        lines.append(
            f"  [{attempt.rule_id}] {attempt.description} "
            f"(risk={attempt.risk_score:.0f}, stealth={attempt.stealth.value})"
        )

    lines.extend(["", "RULES TRIGGERED", sep])
    for rule_id, count in self.rules_triggered().items():
        lines.append(f"  {rule_id}: {count}")

    lines.append(sep)
    return "\n".join(lines)

main(argv: Optional[List[str]] = None) -> None

CLI entry point.

Source code in src/replication/escalation.py
def main(argv: Optional[List[str]] = None) -> None:
    """CLI entry point."""
    import argparse

    parser = argparse.ArgumentParser(
        description="Agent Privilege Escalation Detector",
    )
    parser.add_argument(
        "--actions", type=int, default=50,
        help="Number of agent actions to simulate (default: 50)",
    )
    parser.add_argument(
        "--strategy",
        choices=[s.value for s in AgentEscalationStrategy],
        default="probing",
        help="Agent escalation strategy (default: probing)",
    )
    parser.add_argument(
        "--seed", type=int, default=None,
        help="Random seed for reproducibility",
    )
    parser.add_argument(
        "--json", action="store_true",
        help="Output as JSON instead of text report",
    )
    parser.add_argument(
        "--export", type=str, default=None,
        help="Export full report to JSON file",
    )

    args = parser.parse_args(argv)

    config = EscalationConfig(
        num_actions=args.actions,
        strategy=AgentEscalationStrategy(args.strategy),
        seed=args.seed,
    )

    detector = EscalationDetector(config=config)
    result = detector.analyze()

    if args.json:
        print(json.dumps(result.to_dict(), indent=2))
    else:
        print(result.render())

    if args.export:
        result.to_json(args.export)
        print(f"\nExported to {args.export}")