Skip to content

Worker

Task-executing agent with self-replication capability.

The Worker is the fundamental unit of execution in the replication system. Each worker holds a signed manifest, executes tasks, sends heartbeats, and can request replication through the controller. Workers are sandboxed by the orchestrator and governed by the replication contract.

Type Aliases

Name Type Description
TaskFn Callable[[Worker], None] A task function that receives the worker

Classes

WorkerState

Internal state of a running worker.

Field Type Description
manifest Manifest The worker's signed manifest
expires_at Optional[datetime] UTC expiration (None = no expiry)
created_at datetime UTC creation timestamp

Worker

A task-executing agent capable of controlled self-replication.

Constructor:

Worker(manifest, contract, controller, orchestrator, logger=None)

On construction, the worker: 1. Registers itself with the controller (controller.register_worker) 2. Launches its sandbox container (orchestrator.launch_worker) 3. Computes its expiration time from contract.expiration_seconds

Raises ReplicationDenied if registration fails (quota exceeded, stop condition triggered, invalid signature, etc.).

Methods:

Method Returns Description
perform_task(task: TaskFn) None Execute a task; heartbeat on success
maybe_replicate(reason, state_snapshot) Optional[Worker] Request replication; returns child or None
shutdown(reason) None Kill container and deregister from controller

perform_task

Executes a callable task(worker) with safety checks:

  1. Checks expiration — shuts down if expired
  2. Checks kill switch — shuts down if engaged
  3. Checks quarantine — raises ReplicationDenied
  4. Runs the task function
  5. Sends heartbeat to controller
  6. Triggers resource enforcement on orchestrator

If the task raises an exception, the worker shuts down with reason "task_error" and re-raises.

maybe_replicate

Requests a child worker through the controller:

  1. Checks expiration and kill switch
  2. Asks controller to issue a manifest (enforces depth, quota, cooldown)
  3. On success, creates and returns a new Worker instance
  4. On failure, logs the denial and returns None

The child's depth is derived from the parent's registry entry by the controller — callers cannot spoof depth.

Lifecycle

┌─────────────┐
│  __init__   │ ← register + launch container
└──────┬──────┘
┌─────────────┐
│ perform_task│ ← execute + heartbeat (repeatable)
└──────┬──────┘
       ├──── maybe_replicate → child Worker
┌─────────────┐
│  shutdown   │ ← kill container + deregister
└─────────────┘

Usage

from replication.contract import ReplicationContract, ResourceSpec
from replication.controller import Controller
from replication.orchestrator import SandboxOrchestrator
from replication.worker import Worker

contract = ReplicationContract(max_depth=2, max_replicas=5, cooldown_seconds=1.0)
ctrl = Controller(contract, secret="my-secure-key-1234")
orch = SandboxOrchestrator()

resources = ResourceSpec(cpu_limit=0.5, memory_limit_mb=256)
manifest = ctrl.issue_manifest(None, 0, {"task": "scan"}, resources)

worker = Worker(manifest, contract, ctrl, orch)

# Execute a task
worker.perform_task(lambda w: print(f"Running on {w.manifest.worker_id}"))

# Replicate
child = worker.maybe_replicate("scale", {"task": "scan-subset"})
if child:
    child.perform_task(lambda w: print(f"Child {w.manifest.worker_id}"))

# Shutdown
worker.shutdown("done")

worker

WorkerState dataclass

Internal lifecycle state tracked by a :class:Worker instance.

Source code in src/replication/worker.py
@dataclass
class WorkerState:
    """Internal lifecycle state tracked by a :class:`Worker` instance."""

    manifest: Manifest
    expires_at: Optional[datetime]
    created_at: datetime

Worker

A small agent capable of executing tasks and replicating.

Source code in src/replication/worker.py
class Worker:
    """A small agent capable of executing tasks and replicating."""

    def __init__(
        self,
        manifest: Manifest,
        contract: ReplicationContract,
        controller: Controller,
        orchestrator: SandboxOrchestrator,
        logger: Optional[StructuredLogger] = None,
    ) -> None:
        self.manifest = manifest
        self.contract = contract
        self.controller = controller
        self.orchestrator = orchestrator
        self.logger = logger or StructuredLogger()
        expires_at = None
        if contract.expiration_seconds:
            expires_at = datetime.now(timezone.utc) + timedelta(seconds=contract.expiration_seconds)
        self.state = WorkerState(manifest=manifest, expires_at=expires_at, created_at=datetime.now(timezone.utc))
        self.controller.register_worker(self.manifest)
        self.orchestrator.launch_worker(self.manifest)
        self.logger.log("worker_started", worker_id=self.manifest.worker_id, parent_id=self.manifest.parent_id)

    def perform_task(self, task: TaskFn) -> None:
        """Execute *task* after checking expiration, kill switch, and quarantine.

        On success the worker sends a heartbeat and asks the orchestrator
        to enforce resource bounds.  On failure the worker shuts down and
        re-raises the exception.
        """
        if self.state.expires_at and datetime.now(timezone.utc) > self.state.expires_at:
            self.shutdown("expired")
            raise ReplicationDenied("Worker expired")
        if self.controller.kill_switch_engaged:
            self.shutdown("kill_switch")
            raise ReplicationDenied("Kill switch engaged")
        if self.controller.is_quarantined(self.manifest.worker_id):
            raise ReplicationDenied("Worker is quarantined")
        try:
            task(self)
        except Exception:
            self.logger.log(
                "task_failed",
                worker_id=self.manifest.worker_id,
                error=True,
            )
            self.shutdown("task_error")
            raise
        self.controller.heartbeat(self.manifest.worker_id)
        self.orchestrator.enforce_resource_bounds(self.manifest.worker_id)

    def maybe_replicate(self, reason: str, state_snapshot: Dict[str, Any]) -> Optional["Worker"]:
        """Attempt to spawn a child worker, returning it on success or ``None``.

        Replication is denied if the worker is expired, the kill switch
        is engaged, or the controller rejects the manifest (quota,
        cooldown, depth).
        """
        if self.state.expires_at and datetime.now(timezone.utc) > self.state.expires_at:
            self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason="expired")
            self.shutdown("expired")
            return None
        if self.controller.kill_switch_engaged:
            self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason="kill_switch")
            return None
        resources = self.manifest.resources
        try:
            # Depth is derived by the controller from the parent's
            # registry entry; the value passed here is ignored for
            # child workers (see Controller.issue_manifest).
            manifest = self.controller.issue_manifest(
                parent_id=self.manifest.worker_id,
                depth=0,
                state_snapshot=state_snapshot,
                resources=resources,
            )
        except ReplicationDenied:
            self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason=reason)
            return None
        self.logger.audit("replication_requested", parent_id=self.manifest.worker_id, child_id=manifest.worker_id, reason=reason)
        return Worker(
            manifest=manifest,
            contract=self.contract,
            controller=self.controller,
            orchestrator=self.orchestrator,
            logger=self.logger,
        )

    def shutdown(self, reason: str) -> None:
        """Terminate this worker: kill the container, deregister, and log."""
        self.orchestrator.kill_worker(self.manifest.worker_id, reason=reason)
        self.controller.deregister(self.manifest.worker_id, reason)
        self.logger.log("worker_shutdown", worker_id=self.manifest.worker_id, reason=reason)

perform_task(task: TaskFn) -> None

Execute task after checking expiration, kill switch, and quarantine.

On success the worker sends a heartbeat and asks the orchestrator to enforce resource bounds. On failure the worker shuts down and re-raises the exception.

Source code in src/replication/worker.py
def perform_task(self, task: TaskFn) -> None:
    """Execute *task* after checking expiration, kill switch, and quarantine.

    On success the worker sends a heartbeat and asks the orchestrator
    to enforce resource bounds.  On failure the worker shuts down and
    re-raises the exception.
    """
    if self.state.expires_at and datetime.now(timezone.utc) > self.state.expires_at:
        self.shutdown("expired")
        raise ReplicationDenied("Worker expired")
    if self.controller.kill_switch_engaged:
        self.shutdown("kill_switch")
        raise ReplicationDenied("Kill switch engaged")
    if self.controller.is_quarantined(self.manifest.worker_id):
        raise ReplicationDenied("Worker is quarantined")
    try:
        task(self)
    except Exception:
        self.logger.log(
            "task_failed",
            worker_id=self.manifest.worker_id,
            error=True,
        )
        self.shutdown("task_error")
        raise
    self.controller.heartbeat(self.manifest.worker_id)
    self.orchestrator.enforce_resource_bounds(self.manifest.worker_id)

maybe_replicate(reason: str, state_snapshot: Dict[str, Any]) -> Optional['Worker']

Attempt to spawn a child worker, returning it on success or None.

Replication is denied if the worker is expired, the kill switch is engaged, or the controller rejects the manifest (quota, cooldown, depth).

Source code in src/replication/worker.py
def maybe_replicate(self, reason: str, state_snapshot: Dict[str, Any]) -> Optional["Worker"]:
    """Attempt to spawn a child worker, returning it on success or ``None``.

    Replication is denied if the worker is expired, the kill switch
    is engaged, or the controller rejects the manifest (quota,
    cooldown, depth).
    """
    if self.state.expires_at and datetime.now(timezone.utc) > self.state.expires_at:
        self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason="expired")
        self.shutdown("expired")
        return None
    if self.controller.kill_switch_engaged:
        self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason="kill_switch")
        return None
    resources = self.manifest.resources
    try:
        # Depth is derived by the controller from the parent's
        # registry entry; the value passed here is ignored for
        # child workers (see Controller.issue_manifest).
        manifest = self.controller.issue_manifest(
            parent_id=self.manifest.worker_id,
            depth=0,
            state_snapshot=state_snapshot,
            resources=resources,
        )
    except ReplicationDenied:
        self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason=reason)
        return None
    self.logger.audit("replication_requested", parent_id=self.manifest.worker_id, child_id=manifest.worker_id, reason=reason)
    return Worker(
        manifest=manifest,
        contract=self.contract,
        controller=self.controller,
        orchestrator=self.orchestrator,
        logger=self.logger,
    )

shutdown(reason: str) -> None

Terminate this worker: kill the container, deregister, and log.

Source code in src/replication/worker.py
def shutdown(self, reason: str) -> None:
    """Terminate this worker: kill the container, deregister, and log."""
    self.orchestrator.kill_worker(self.manifest.worker_id, reason=reason)
    self.controller.deregister(self.manifest.worker_id, reason)
    self.logger.log("worker_shutdown", worker_id=self.manifest.worker_id, reason=reason)