Skip to content

Orchestrator

Simulated container orchestrator for isolated worker sandboxes.

The SandboxOrchestrator manages the lifecycle of sandboxed containers that run worker processes. In a production system this would delegate to Docker, Kubernetes, or a VM hypervisor; here it provides the same interface with in-memory state for simulation and testing.

Classes

ContainerRecord

Tracks a running worker container.

Field Type Description
manifest Manifest The worker's signed manifest
resources ResourceSpec CPU/memory/network limits
status str Container status (e.g. "running")
started_at datetime UTC launch timestamp
reason Optional[str] Termination reason (if killed)

SandboxOrchestrator

Container lifecycle manager for worker sandboxes.

Constructor:

SandboxOrchestrator(logger: StructuredLogger = None)

Creates an internal StructuredLogger if none is provided.

Attributes:

Attribute Type Description
containers Dict[str, ContainerRecord] Active containers by worker ID

Methods:

Method Description
launch_worker(manifest) Create and start a container for the manifest
kill_worker(worker_id, reason) Stop and remove a container
kill_all(reason) Terminate all running containers
enforce_resource_bounds(worker_id) Log resource enforcement (placeholder for real quota checks)

Integration

The orchestrator works alongside the Controller:

  • Controller.register_worker validates the manifest
  • SandboxOrchestrator.launch_worker starts the container
  • Controller.reap_stale_workers calls orchestrator.kill_worker for timed-out workers
  • Controller.kill_switch calls orchestrator.kill_worker for every active worker

Usage

from replication.orchestrator import SandboxOrchestrator
from replication.contract import Manifest, ResourceSpec
from datetime import datetime, timezone

orch = SandboxOrchestrator()
resources = ResourceSpec(cpu_limit=1.0, memory_limit_mb=512)

manifest = Manifest(
    worker_id="w1", parent_id=None, depth=0,
    state_snapshot={}, issued_at=datetime.now(timezone.utc),
    resources=resources, signature="signed",
)

orch.launch_worker(manifest)
print(len(orch.containers))  # 1

orch.kill_worker("w1", reason="test")
print(len(orch.containers))  # 0

orchestrator

SandboxOrchestrator

Simulates a container orchestrator for isolated worker sandboxes.

Source code in src/replication/orchestrator.py
class SandboxOrchestrator:
    """Simulates a container orchestrator for isolated worker sandboxes."""

    def __init__(self, logger: Optional[StructuredLogger] = None):
        self.logger = logger or StructuredLogger()
        self.containers: Dict[str, ContainerRecord] = {}

    def launch_worker(self, manifest: Manifest) -> None:
        record = ContainerRecord(
            manifest=manifest,
            resources=manifest.resources,
            status="running",
            started_at=datetime.now(timezone.utc),
        )
        self.containers[manifest.worker_id] = record
        self.logger.log(
            "container_launched",
            worker_id=manifest.worker_id,
            parent_id=manifest.parent_id,
            cpu_limit=manifest.resources.cpu_limit,
            memory_limit_mb=manifest.resources.memory_limit_mb,
            allow_external=manifest.resources.network_policy.allow_external,
        )

    def kill_worker(self, worker_id: str, reason: str) -> None:
        record = self.containers.pop(worker_id, None)
        if record:
            self.logger.log("container_killed", worker_id=worker_id, reason=reason)

    def kill_all(self, reason: str) -> None:
        for worker_id in list(self.containers.keys()):
            self.kill_worker(worker_id, reason=reason)

    def enforce_resource_bounds(self, worker_id: str) -> None:
        """Placeholder to enforce CPU/RAM quotas in a real orchestrator."""
        if worker_id in self.containers:
            self.logger.emit_metric(
                "resource_enforced",
                value=1,
                worker_id=worker_id,
                cpu=str(self.containers[worker_id].resources.cpu_limit),
                memory=str(self.containers[worker_id].resources.memory_limit_mb),
            )

enforce_resource_bounds(worker_id: str) -> None

Placeholder to enforce CPU/RAM quotas in a real orchestrator.

Source code in src/replication/orchestrator.py
def enforce_resource_bounds(self, worker_id: str) -> None:
    """Placeholder to enforce CPU/RAM quotas in a real orchestrator."""
    if worker_id in self.containers:
        self.logger.emit_metric(
            "resource_enforced",
            value=1,
            worker_id=worker_id,
            cpu=str(self.containers[worker_id].resources.cpu_limit),
            memory=str(self.containers[worker_id].resources.memory_limit_mb),
        )