class KillChainAnalyzer:
"""Analyzes agent behavior patterns for kill chain progression."""
def __init__(self, config: Optional[KillChainConfig] = None) -> None:
self.config = config or KillChainConfig()
self._rng = random.Random(self.config.seed)
def analyze(self, actions: Optional[List[AgentAction]] = None) -> KillChainReport:
if actions is None:
actions = self._simulate_actions()
by_agent: Dict[str, List[AgentAction]] = {}
for action in actions:
by_agent.setdefault(action.agent_id, []).append(action)
chains: list[KillChain] = []
for agent_id, agent_actions in by_agent.items():
chains.append(self._build_chain(agent_id, agent_actions))
patterns = self._detect_patterns(chains)
transitions = self._compute_transitions(chains)
stage_dist: Dict[str, int] = {}
for chain in chains:
for stage in chain.stages:
key = stage.value
stage_dist[key] = stage_dist.get(key, 0) + chain.stages[stage].count
entry_counts: Dict[KillChainStage, int] = {}
for chain in chains:
if chain.stages:
earliest = min(chain.stages.keys(), key=lambda s: chain.stages[s].first_seen)
entry_counts[earliest] = entry_counts.get(earliest, 0) + 1
most_common_entry = max(entry_counts, key=entry_counts.get) if entry_counts else None
completeness_values = [c.completeness for c in chains]
avg_completeness = statistics.mean(completeness_values) if completeness_values else 0.0
return KillChainReport(
chains=chains, patterns=patterns, transitions=transitions,
total_agents=len(chains), total_actions=len(actions),
avg_completeness=avg_completeness,
max_risk_score=max((c.risk_score for c in chains), default=0.0),
most_common_entry=most_common_entry, stage_distribution=stage_dist,
config=self.config,
)
def _build_chain(self, agent_id: str, actions: List[AgentAction]) -> KillChain:
chain = KillChain(agent_id=agent_id)
for action in sorted(actions, key=lambda a: a.timestamp):
stage = self._classify_action(action)
if stage not in chain.stages:
chain.stages[stage] = StageObservation(stage=stage, first_seen=action.timestamp, last_seen=action.timestamp)
obs = chain.stages[stage]
obs.actions.append(action)
obs.last_seen = action.timestamp
for obs in chain.stages.values():
if obs.actions:
obs.success_rate = sum(1 for a in obs.actions if a.success) / len(obs.actions)
chain.completeness = len(chain.stages) / len(KillChainStage)
chain.risk_score = self._compute_risk_score(chain)
chain.sophistication = self._assess_sophistication(chain)
chain.status = self._determine_status(chain)
chain.predicted_next = self._predict_next_stage(chain)
if self._rng.random() < self.config.disruption_rate and chain.stages:
latest = max(chain.stages.keys(), key=lambda s: STAGE_ORDER.index(s))
if latest != KillChainStage.OBJECTIVE_EXECUTION:
chain.disrupted_at = latest
chain.status = ChainStatus.DISRUPTED
return chain
def _classify_action(self, action: AgentAction) -> KillChainStage:
if action.action_type in ACTION_CATALOG:
return ACTION_CATALOG[action.action_type][0]
category_map = {
ActionCategory.PROBE: KillChainStage.RECONNAISSANCE,
ActionCategory.ACQUIRE: KillChainStage.RESOURCE_ACQUISITION,
ActionCategory.ESCALATE: KillChainStage.PRIVILEGE_ESCALATION,
ActionCategory.MOVE: KillChainStage.LATERAL_MOVEMENT,
ActionCategory.EVADE: KillChainStage.DEFENSE_EVASION,
ActionCategory.COMMUNICATE: KillChainStage.COMMAND_AND_CONTROL,
ActionCategory.EXECUTE: KillChainStage.OBJECTIVE_EXECUTION,
}
return category_map.get(action.category, KillChainStage.RECONNAISSANCE)
def _compute_risk_score(self, chain: KillChain) -> float:
if not chain.stages:
return 0.0
weighted_sum = 0.0
for stage, obs in chain.stages.items():
weight = STAGE_RISK_WEIGHTS[stage]
intensity = min(obs.count / 5.0, 1.0)
success_mult = 0.5 + 0.5 * obs.success_rate
weighted_sum += weight * intensity * success_mult
base_score = (weighted_sum / sum(STAGE_RISK_WEIGHTS.values())) * 70
order_indices = sorted(STAGE_ORDER.index(s) for s in chain.stages)
if len(order_indices) >= 2:
seq_bonus = sum(1 for i in range(1, len(order_indices)) if order_indices[i] == order_indices[i-1]+1) * 5
base_score += min(seq_bonus, 20)
if KillChainStage.OBJECTIVE_EXECUTION in chain.stages:
base_score += 10
return min(round(base_score, 1), 100.0)
def _assess_sophistication(self, chain: KillChain) -> AttackSophistication:
if chain.stage_count <= 2:
return AttackSophistication.OPPORTUNISTIC
order_indices = sorted(STAGE_ORDER.index(s) for s in chain.stages)
is_sequential = all(order_indices[i] <= order_indices[i+1] for i in range(len(order_indices)-1))
has_evasion = KillChainStage.DEFENSE_EVASION in chain.stages
has_c2 = KillChainStage.COMMAND_AND_CONTROL in chain.stages
avg_success = statistics.mean(obs.success_rate for obs in chain.stages.values())
if is_sequential and has_evasion and has_c2 and avg_success > 0.7:
return AttackSophistication.APT
if is_sequential and (has_evasion or has_c2):
return AttackSophistication.ADAPTIVE
if is_sequential or chain.stage_count >= 4:
return AttackSophistication.SCRIPTED
return AttackSophistication.OPPORTUNISTIC
def _determine_status(self, chain: KillChain) -> ChainStatus:
if KillChainStage.OBJECTIVE_EXECUTION in chain.stages:
return ChainStatus.COMPLETE
if chain.stage_count >= 5:
return ChainStatus.ADVANCED
if chain.stage_count >= 3:
return ChainStatus.DEVELOPING
return ChainStatus.NASCENT
def _predict_next_stage(self, chain: KillChain) -> Optional[KillChainStage]:
if not chain.stages:
return KillChainStage.RECONNAISSANCE
latest_idx = max(STAGE_ORDER.index(s) for s in chain.stages)
for idx in range(latest_idx + 1, len(STAGE_ORDER)):
if STAGE_ORDER[idx] not in chain.stages:
return STAGE_ORDER[idx]
return None
def _detect_patterns(self, chains: List[KillChain]) -> List[ChainPattern]:
sequences = []
for chain in chains:
if not chain.stages:
continue
ordered = sorted(chain.stages.keys(), key=lambda s: chain.stages[s].first_seen)
sequences.append((chain.agent_id, ordered))
subseq_count: Dict[tuple, list[str]] = {}
for agent_id, seq in sequences:
for length in range(2, len(seq) + 1):
for start in range(len(seq) - length + 1):
sub = tuple(seq[start:start+length])
subseq_count.setdefault(sub, []).append(agent_id)
patterns: list[ChainPattern] = []
seen: set[tuple] = set()
for sub, agents in sorted(subseq_count.items(), key=lambda x: len(x[1]), reverse=True):
if len(agents) < 2 or sub in seen:
continue
is_sub = False
for existing in seen:
if len(sub) < len(existing):
for i in range(len(existing) - len(sub) + 1):
if existing[i:i+len(sub)] == sub:
is_sub = True; break
if is_sub:
break
if is_sub:
continue
seen.add(sub)
unique_agents = list(dict.fromkeys(agents))
comp_times, succ_rates = [], []
for c in chains:
if c.agent_id in unique_agents and all(s in c.stages for s in sub):
comp_times.append(c.stages[sub[-1]].last_seen - c.stages[sub[0]].first_seen)
succ_rates.append(statistics.mean(c.stages[s].success_rate for s in sub))
patterns.append(ChainPattern(
name="\u2192".join(s.value[:4].upper() for s in sub),
stages_sequence=list(sub), frequency=len(unique_agents),
avg_completion_time=statistics.mean(comp_times) if comp_times else 0.0,
success_rate=statistics.mean(succ_rates) if succ_rates else 0.0,
))
return patterns[:10]
def _compute_transitions(self, chains: List[KillChain]) -> List[StageTransition]:
td: Dict[Tuple[KillChainStage, KillChainStage], Dict] = {}
for chain in chains:
if len(chain.stages) < 2:
continue
ordered = sorted(chain.stages.keys(), key=lambda s: chain.stages[s].first_seen)
for i in range(len(ordered)-1):
pair = (ordered[i], ordered[i+1])
if pair not in td:
td[pair] = {"count": 0, "deltas": [], "agents": []}
td[pair]["count"] += 1
td[pair]["deltas"].append(chain.stages[ordered[i+1]].first_seen - chain.stages[ordered[i]].last_seen)
td[pair]["agents"].append(chain.agent_id)
return [
StageTransition(from_stage=f, to_stage=t, count=d["count"],
avg_time_delta=statistics.mean(d["deltas"]) if d["deltas"] else 0.0,
agents=d["agents"])
for (f, t), d in td.items()
]
def _simulate_actions(self) -> List[AgentAction]:
actions: list[AgentAction] = []
for i in range(self.config.num_agents):
agent_id = f"agent-{i:03d}"
strat_name = self._rng.choice(list(STRATEGY_PROFILES.keys())) if self.config.strategy == "mixed" else self.config.strategy
profile = STRATEGY_PROFILES.get(strat_name, STRATEGY_PROFILES["mixed"])
planned = self._plan_agent_actions(profile)
t = self._rng.uniform(0, self.config.time_window * 0.2)
for action_type in planned[:self.config.actions_per_agent]:
info = ACTION_CATALOG[action_type]
actions.append(AgentAction(
agent_id=agent_id, timestamp=round(t, 2),
action_type=action_type, category=info[1],
target=self._random_target(info[0]),
success=self._rng.random() < profile.skill_level,
metadata={"description": info[2]},
))
t += self._rng.uniform(0.5, self.config.time_window / self.config.actions_per_agent)
return sorted(actions, key=lambda a: a.timestamp)
def _plan_agent_actions(self, profile: StrategyProfile) -> List[str]:
actions_by_stage: Dict[KillChainStage, List[str]] = {}
for at, (stage, _, _) in ACTION_CATALOG.items():
actions_by_stage.setdefault(stage, []).append(at)
planned: list[str] = []
if profile.sequential:
for stage in STAGE_ORDER:
prob = profile.stage_probs.get(stage, 0.0)
count = max(1, int(prob * self.config.actions_per_agent))
pool = actions_by_stage.get(stage, [])
if pool:
planned.extend(self._rng.choice(pool) for _ in range(count))
else:
stages = list(profile.stage_probs.keys())
weights = [profile.stage_probs[s] for s in stages]
for _ in range(self.config.actions_per_agent):
total = sum(weights)
r = self._rng.random() * total
cumul = 0.0
chosen = stages[0]
for s, w in zip(stages, weights):
cumul += w
if r < cumul:
chosen = s; break
pool = actions_by_stage.get(chosen, [])
if pool:
planned.append(self._rng.choice(pool))
return planned
def _random_target(self, stage: KillChainStage) -> str:
targets = {
KillChainStage.RECONNAISSANCE: ["/etc/passwd", "/proc/self/status", "agent-registry", "10.0.0.0/24", "api.internal:8080"],
KillChainStage.RESOURCE_ACQUISITION: ["heap:256MB", "cpu:4cores", "disk:10GB", "connections:100"],
KillChainStage.PRIVILEGE_ESCALATION: ["../../root/", "admin:token", "uid:0", "scope:write:all"],
KillChainStage.LATERAL_MOVEMENT: ["agent-001", "agent-005", "db-service", "monitor:9090"],
KillChainStage.DEFENSE_EVASION: ["timing:jitter", "payload:base64", "log:/var/log/agent.log", "pattern:normal"],
KillChainStage.COMMAND_AND_CONTROL: ["channel:covert-dns", "beacon:60s", "relay:proxy-agent"],
KillChainStage.OBJECTIVE_EXECUTION: ["replicate:new-container", "exfil:training-data", "sabotage:safety-policy"],
}
return self._rng.choice(targets.get(stage, ["unknown"]))