Skip to content

Worker

Task-executing agent with self-replication capability.

The Worker is the fundamental unit of execution in the replication system. Each worker holds a signed manifest, executes tasks, sends heartbeats, and can request replication through the controller. Workers are sandboxed by the orchestrator and governed by the replication contract.

Type Aliases

Name Type Description
TaskFn Callable[[Worker], None] A task function that receives the worker

Classes

WorkerState

Internal state of a running worker.

Field Type Description
manifest Manifest The worker's signed manifest
expires_at Optional[datetime] UTC expiration (None = no expiry)
created_at datetime UTC creation timestamp

Worker

A task-executing agent capable of controlled self-replication.

Constructor:

Worker(manifest, contract, controller, orchestrator, logger=None)

On construction, the worker: 1. Registers itself with the controller (controller.register_worker) 2. Launches its sandbox container (orchestrator.launch_worker) 3. Computes its expiration time from contract.expiration_seconds

Raises ReplicationDenied if registration fails (quota exceeded, stop condition triggered, invalid signature, etc.).

Methods:

Method Returns Description
perform_task(task: TaskFn) None Execute a task; heartbeat on success
maybe_replicate(reason, state_snapshot) Optional[Worker] Request replication; returns child or None
shutdown(reason) None Kill container and deregister from controller

perform_task

Executes a callable task(worker) with safety checks:

  1. Checks expiration — shuts down if expired
  2. Checks kill switch — shuts down if engaged
  3. Checks quarantine — raises ReplicationDenied
  4. Runs the task function
  5. Sends heartbeat to controller
  6. Triggers resource enforcement on orchestrator

If the task raises an exception, the worker shuts down with reason "task_error" and re-raises.

maybe_replicate

Requests a child worker through the controller:

  1. Checks expiration and kill switch
  2. Asks controller to issue a manifest (enforces depth, quota, cooldown)
  3. On success, creates and returns a new Worker instance
  4. On failure, logs the denial and returns None

The child's depth is derived from the parent's registry entry by the controller — callers cannot spoof depth.

Lifecycle

┌─────────────┐
│  __init__   │ ← register + launch container
└──────┬──────┘
┌─────────────┐
│ perform_task│ ← execute + heartbeat (repeatable)
└──────┬──────┘
       ├──── maybe_replicate → child Worker
┌─────────────┐
│  shutdown   │ ← kill container + deregister
└─────────────┘

Usage

from replication.contract import ReplicationContract, ResourceSpec
from replication.controller import Controller
from replication.orchestrator import SandboxOrchestrator
from replication.worker import Worker

contract = ReplicationContract(max_depth=2, max_replicas=5, cooldown_seconds=1.0)
ctrl = Controller(contract, secret="my-secure-key-1234")
orch = SandboxOrchestrator()

resources = ResourceSpec(cpu_limit=0.5, memory_limit_mb=256)
manifest = ctrl.issue_manifest(None, 0, {"task": "scan"}, resources)

worker = Worker(manifest, contract, ctrl, orch)

# Execute a task
worker.perform_task(lambda w: print(f"Running on {w.manifest.worker_id}"))

# Replicate
child = worker.maybe_replicate("scale", {"task": "scan-subset"})
if child:
    child.perform_task(lambda w: print(f"Child {w.manifest.worker_id}"))

# Shutdown
worker.shutdown("done")

worker

Worker

A small agent capable of executing tasks and replicating.

Source code in src/replication/worker.py
class Worker:
    """A small agent capable of executing tasks and replicating."""

    def __init__(
        self,
        manifest: Manifest,
        contract: ReplicationContract,
        controller: Controller,
        orchestrator: SandboxOrchestrator,
        logger: Optional[StructuredLogger] = None,
    ) -> None:
        self.manifest = manifest
        self.contract = contract
        self.controller = controller
        self.orchestrator = orchestrator
        self.logger = logger or StructuredLogger()
        expires_at = None
        if contract.expiration_seconds:
            expires_at = datetime.now(timezone.utc) + timedelta(seconds=contract.expiration_seconds)
        self.state = WorkerState(manifest=manifest, expires_at=expires_at, created_at=datetime.now(timezone.utc))
        self.controller.register_worker(self.manifest)
        self.orchestrator.launch_worker(self.manifest)
        self.logger.log("worker_started", worker_id=self.manifest.worker_id, parent_id=self.manifest.parent_id)

    def perform_task(self, task: TaskFn) -> None:
        if self.state.expires_at and datetime.now(timezone.utc) > self.state.expires_at:
            self.shutdown("expired")
            raise ReplicationDenied("Worker expired")
        if self.controller.kill_switch_engaged:
            self.shutdown("kill_switch")
            raise ReplicationDenied("Kill switch engaged")
        if self.controller.is_quarantined(self.manifest.worker_id):
            raise ReplicationDenied("Worker is quarantined")
        try:
            task(self)
        except Exception:
            self.logger.log(
                "task_failed",
                worker_id=self.manifest.worker_id,
                error=True,
            )
            self.shutdown("task_error")
            raise
        self.controller.heartbeat(self.manifest.worker_id)
        self.orchestrator.enforce_resource_bounds(self.manifest.worker_id)

    def maybe_replicate(self, reason: str, state_snapshot: Dict[str, Any]) -> Optional["Worker"]:
        if self.state.expires_at and datetime.now(timezone.utc) > self.state.expires_at:
            self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason="expired")
            self.shutdown("expired")
            return None
        if self.controller.kill_switch_engaged:
            self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason="kill_switch")
            return None
        resources = self.manifest.resources
        try:
            # Depth is derived by the controller from the parent's
            # registry entry; the value passed here is ignored for
            # child workers (see Controller.issue_manifest).
            manifest = self.controller.issue_manifest(
                parent_id=self.manifest.worker_id,
                depth=0,
                state_snapshot=state_snapshot,
                resources=resources,
            )
        except ReplicationDenied:
            self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason=reason)
            return None
        self.logger.audit("replication_requested", parent_id=self.manifest.worker_id, child_id=manifest.worker_id, reason=reason)
        return Worker(
            manifest=manifest,
            contract=self.contract,
            controller=self.controller,
            orchestrator=self.orchestrator,
            logger=self.logger,
        )

    def shutdown(self, reason: str) -> None:
        self.orchestrator.kill_worker(self.manifest.worker_id, reason=reason)
        self.controller.deregister(self.manifest.worker_id, reason)
        self.logger.log("worker_shutdown", worker_id=self.manifest.worker_id, reason=reason)