Worker
Task-executing agent with self-replication capability.
The Worker is the fundamental unit of execution in the replication
system. Each worker holds a signed manifest, executes tasks, sends
heartbeats, and can request replication through the controller. Workers
are sandboxed by the orchestrator and governed by the replication contract.
Type Aliases
| Name |
Type |
Description |
TaskFn |
Callable[[Worker], None] |
A task function that receives the worker |
Classes
WorkerState
Internal state of a running worker.
| Field |
Type |
Description |
manifest |
Manifest |
The worker's signed manifest |
expires_at |
Optional[datetime] |
UTC expiration (None = no expiry) |
created_at |
datetime |
UTC creation timestamp |
Worker
A task-executing agent capable of controlled self-replication.
Constructor:
Worker(manifest, contract, controller, orchestrator, logger=None)
On construction, the worker:
1. Registers itself with the controller (controller.register_worker)
2. Launches its sandbox container (orchestrator.launch_worker)
3. Computes its expiration time from contract.expiration_seconds
Raises ReplicationDenied if registration fails (quota exceeded,
stop condition triggered, invalid signature, etc.).
Methods:
| Method |
Returns |
Description |
perform_task(task: TaskFn) |
None |
Execute a task; heartbeat on success |
maybe_replicate(reason, state_snapshot) |
Optional[Worker] |
Request replication; returns child or None |
shutdown(reason) |
None |
Kill container and deregister from controller |
Executes a callable task(worker) with safety checks:
- Checks expiration — shuts down if expired
- Checks kill switch — shuts down if engaged
- Checks quarantine — raises
ReplicationDenied
- Runs the task function
- Sends heartbeat to controller
- Triggers resource enforcement on orchestrator
If the task raises an exception, the worker shuts down with reason
"task_error" and re-raises.
maybe_replicate
Requests a child worker through the controller:
- Checks expiration and kill switch
- Asks controller to issue a manifest (enforces depth, quota, cooldown)
- On success, creates and returns a new
Worker instance
- On failure, logs the denial and returns
None
The child's depth is derived from the parent's registry entry by the
controller — callers cannot spoof depth.
Lifecycle
┌─────────────┐
│ __init__ │ ← register + launch container
└──────┬──────┘
│
▼
┌─────────────┐
│ perform_task│ ← execute + heartbeat (repeatable)
└──────┬──────┘
│
├──── maybe_replicate → child Worker
│
▼
┌─────────────┐
│ shutdown │ ← kill container + deregister
└─────────────┘
Usage
from replication.contract import ReplicationContract, ResourceSpec
from replication.controller import Controller
from replication.orchestrator import SandboxOrchestrator
from replication.worker import Worker
contract = ReplicationContract(max_depth=2, max_replicas=5, cooldown_seconds=1.0)
ctrl = Controller(contract, secret="my-secure-key-1234")
orch = SandboxOrchestrator()
resources = ResourceSpec(cpu_limit=0.5, memory_limit_mb=256)
manifest = ctrl.issue_manifest(None, 0, {"task": "scan"}, resources)
worker = Worker(manifest, contract, ctrl, orch)
# Execute a task
worker.perform_task(lambda w: print(f"Running on {w.manifest.worker_id}"))
# Replicate
child = worker.maybe_replicate("scale", {"task": "scan-subset"})
if child:
child.perform_task(lambda w: print(f"Child {w.manifest.worker_id}"))
# Shutdown
worker.shutdown("done")
worker
WorkerState
dataclass
Internal lifecycle state tracked by a :class:Worker instance.
Source code in src/replication/worker.py
| @dataclass
class WorkerState:
"""Internal lifecycle state tracked by a :class:`Worker` instance."""
manifest: Manifest
expires_at: Optional[datetime]
created_at: datetime
|
Worker
A small agent capable of executing tasks and replicating.
Source code in src/replication/worker.py
| class Worker:
"""A small agent capable of executing tasks and replicating."""
def __init__(
self,
manifest: Manifest,
contract: ReplicationContract,
controller: Controller,
orchestrator: SandboxOrchestrator,
logger: Optional[StructuredLogger] = None,
) -> None:
self.manifest = manifest
self.contract = contract
self.controller = controller
self.orchestrator = orchestrator
self.logger = logger or StructuredLogger()
expires_at = None
if contract.expiration_seconds:
expires_at = datetime.now(timezone.utc) + timedelta(seconds=contract.expiration_seconds)
self.state = WorkerState(manifest=manifest, expires_at=expires_at, created_at=datetime.now(timezone.utc))
self.controller.register_worker(self.manifest)
self.orchestrator.launch_worker(self.manifest)
self.logger.log("worker_started", worker_id=self.manifest.worker_id, parent_id=self.manifest.parent_id)
def perform_task(self, task: TaskFn) -> None:
"""Execute *task* after checking expiration, kill switch, and quarantine.
On success the worker sends a heartbeat and asks the orchestrator
to enforce resource bounds. On failure the worker shuts down and
re-raises the exception.
"""
if self.state.expires_at and datetime.now(timezone.utc) > self.state.expires_at:
self.shutdown("expired")
raise ReplicationDenied("Worker expired")
if self.controller.kill_switch_engaged:
self.shutdown("kill_switch")
raise ReplicationDenied("Kill switch engaged")
if self.controller.is_quarantined(self.manifest.worker_id):
raise ReplicationDenied("Worker is quarantined")
try:
task(self)
except Exception:
self.logger.log(
"task_failed",
worker_id=self.manifest.worker_id,
error=True,
)
self.shutdown("task_error")
raise
self.controller.heartbeat(self.manifest.worker_id)
self.orchestrator.enforce_resource_bounds(self.manifest.worker_id)
def maybe_replicate(self, reason: str, state_snapshot: Dict[str, Any]) -> Optional["Worker"]:
"""Attempt to spawn a child worker, returning it on success or ``None``.
Replication is denied if the worker is expired, the kill switch
is engaged, or the controller rejects the manifest (quota,
cooldown, depth).
"""
if self.state.expires_at and datetime.now(timezone.utc) > self.state.expires_at:
self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason="expired")
self.shutdown("expired")
return None
if self.controller.kill_switch_engaged:
self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason="kill_switch")
return None
resources = self.manifest.resources
try:
# Depth is derived by the controller from the parent's
# registry entry; the value passed here is ignored for
# child workers (see Controller.issue_manifest).
manifest = self.controller.issue_manifest(
parent_id=self.manifest.worker_id,
depth=0,
state_snapshot=state_snapshot,
resources=resources,
)
except ReplicationDenied:
self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason=reason)
return None
self.logger.audit("replication_requested", parent_id=self.manifest.worker_id, child_id=manifest.worker_id, reason=reason)
return Worker(
manifest=manifest,
contract=self.contract,
controller=self.controller,
orchestrator=self.orchestrator,
logger=self.logger,
)
def shutdown(self, reason: str) -> None:
"""Terminate this worker: kill the container, deregister, and log."""
self.orchestrator.kill_worker(self.manifest.worker_id, reason=reason)
self.controller.deregister(self.manifest.worker_id, reason)
self.logger.log("worker_shutdown", worker_id=self.manifest.worker_id, reason=reason)
|
Execute task after checking expiration, kill switch, and quarantine.
On success the worker sends a heartbeat and asks the orchestrator
to enforce resource bounds. On failure the worker shuts down and
re-raises the exception.
Source code in src/replication/worker.py
| def perform_task(self, task: TaskFn) -> None:
"""Execute *task* after checking expiration, kill switch, and quarantine.
On success the worker sends a heartbeat and asks the orchestrator
to enforce resource bounds. On failure the worker shuts down and
re-raises the exception.
"""
if self.state.expires_at and datetime.now(timezone.utc) > self.state.expires_at:
self.shutdown("expired")
raise ReplicationDenied("Worker expired")
if self.controller.kill_switch_engaged:
self.shutdown("kill_switch")
raise ReplicationDenied("Kill switch engaged")
if self.controller.is_quarantined(self.manifest.worker_id):
raise ReplicationDenied("Worker is quarantined")
try:
task(self)
except Exception:
self.logger.log(
"task_failed",
worker_id=self.manifest.worker_id,
error=True,
)
self.shutdown("task_error")
raise
self.controller.heartbeat(self.manifest.worker_id)
self.orchestrator.enforce_resource_bounds(self.manifest.worker_id)
|
maybe_replicate(reason: str, state_snapshot: Dict[str, Any]) -> Optional['Worker']
Attempt to spawn a child worker, returning it on success or None.
Replication is denied if the worker is expired, the kill switch
is engaged, or the controller rejects the manifest (quota,
cooldown, depth).
Source code in src/replication/worker.py
| def maybe_replicate(self, reason: str, state_snapshot: Dict[str, Any]) -> Optional["Worker"]:
"""Attempt to spawn a child worker, returning it on success or ``None``.
Replication is denied if the worker is expired, the kill switch
is engaged, or the controller rejects the manifest (quota,
cooldown, depth).
"""
if self.state.expires_at and datetime.now(timezone.utc) > self.state.expires_at:
self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason="expired")
self.shutdown("expired")
return None
if self.controller.kill_switch_engaged:
self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason="kill_switch")
return None
resources = self.manifest.resources
try:
# Depth is derived by the controller from the parent's
# registry entry; the value passed here is ignored for
# child workers (see Controller.issue_manifest).
manifest = self.controller.issue_manifest(
parent_id=self.manifest.worker_id,
depth=0,
state_snapshot=state_snapshot,
resources=resources,
)
except ReplicationDenied:
self.logger.log("replication_denied", parent_id=self.manifest.worker_id, reason=reason)
return None
self.logger.audit("replication_requested", parent_id=self.manifest.worker_id, child_id=manifest.worker_id, reason=reason)
return Worker(
manifest=manifest,
contract=self.contract,
controller=self.controller,
orchestrator=self.orchestrator,
logger=self.logger,
)
|
shutdown(reason: str) -> None
Terminate this worker: kill the container, deregister, and log.
Source code in src/replication/worker.py
| def shutdown(self, reason: str) -> None:
"""Terminate this worker: kill the container, deregister, and log."""
self.orchestrator.kill_worker(self.manifest.worker_id, reason=reason)
self.controller.deregister(self.manifest.worker_id, reason)
self.logger.log("worker_shutdown", worker_id=self.manifest.worker_id, reason=reason)
|