Skip to content

Contract

Replication policy definitions, manifests, and stop conditions.

The contract module defines the data structures and rules that govern how workers are allowed to replicate. Every component in the system references these types; the contract is the single source of truth for replication constraints.

Data Classes

NetworkPolicy

Controls network egress for a worker sandbox.

Field Type Default Description
allow_controller bool True Allow communication with controller
allow_external bool False Allow outbound network requests

ResourceSpec

CPU and memory limits for a worker container.

Field Type Description
cpu_limit float (> 0) CPU core allocation
memory_limit_mb int (> 0) RAM limit in megabytes
network_policy NetworkPolicy Network egress controls (default: no ext.)

Raises ValueError if limits are non-positive.

Manifest

Immutable snapshot of a worker's identity and configuration, carried when a replica is spawned.

Field Type Description
worker_id str Unique identifier (hex token)
parent_id Optional[str] Parent worker ID (None for root)
depth int Replication depth (0 = root)
state_snapshot Dict[str, Any] Serializable state cloned into replica
issued_at datetime UTC timestamp of manifest creation
resources ResourceSpec CPU/memory/network constraints
signature str HMAC-SHA256 signature (set by signer)

ReplicationContext

Passed to stop condition predicates for evaluation.

Field Type Description
manifest Manifest The candidate manifest
active_count int Current live worker count
contract ReplicationContract The governing contract

StopCondition

Named predicate that can halt replication when triggered.

Field Type Description
name str Human-readable identifier
description str Explanation of the condition
predicate Predicate Callable(ReplicationContext) → bool

Method: is_triggered(context) — returns True if the condition fires.

ReplicationContract

Top-level policy governing replication behaviour.

Field Type Description
max_depth int (≥ 0) Maximum replication depth
max_replicas int (≥ 1) Maximum concurrent active workers
cooldown_seconds float (≥ 0) Minimum seconds between spawns
stop_conditions List[StopCondition] Custom halt predicates
expiration_seconds Optional[float] (> 0) Worker lifetime limit (None = no limit)

Method: evaluate(context) — returns the first triggered StopCondition, or None.

Usage

from replication.contract import (
    ReplicationContract, StopCondition, ReplicationContext,
    ResourceSpec, NetworkPolicy,
)

# Define resources
resources = ResourceSpec(cpu_limit=0.5, memory_limit_mb=256)

# Create a contract with a custom stop condition
contract = ReplicationContract(
    max_depth=3,
    max_replicas=10,
    cooldown_seconds=5.0,
    stop_conditions=[
        StopCondition(
            name="max_active_5",
            description="Stop when 5 workers are active",
            predicate=lambda ctx: ctx.active_count >= 5,
        ),
    ],
)

contract

NetworkPolicy dataclass

Describes network egress controls for a worker sandbox.

Source code in src/replication/contract.py
@dataclass
class NetworkPolicy:
    """Describes network egress controls for a worker sandbox."""

    allow_controller: bool = True
    allow_external: bool = False

ResourceSpec dataclass

CPU, memory, and network limits assigned to a worker sandbox.

Both cpu_limit and memory_limit_mb must be positive; the constructor raises :class:ValueError otherwise.

Source code in src/replication/contract.py
@dataclass
class ResourceSpec:
    """CPU, memory, and network limits assigned to a worker sandbox.

    Both ``cpu_limit`` and ``memory_limit_mb`` must be positive; the
    constructor raises :class:`ValueError` otherwise.
    """

    cpu_limit: float
    memory_limit_mb: int
    network_policy: NetworkPolicy = field(default_factory=NetworkPolicy)

    def __post_init__(self) -> None:
        if self.cpu_limit <= 0:
            raise ValueError(
                f"cpu_limit must be > 0, got {self.cpu_limit}"
            )
        if self.memory_limit_mb <= 0:
            raise ValueError(
                f"memory_limit_mb must be > 0, got {self.memory_limit_mb}"
            )

Manifest dataclass

Snapshot of the state and configuration cloned into a replica.

Source code in src/replication/contract.py
@dataclass
class Manifest:
    """Snapshot of the state and configuration cloned into a replica."""

    worker_id: str
    parent_id: Optional[str]
    depth: int
    state_snapshot: Dict[str, Any]
    issued_at: datetime
    resources: ResourceSpec
    signature: str

ReplicationContext dataclass

Runtime context passed to stop-condition predicates during replication checks.

Attributes

manifest : Manifest The manifest of the worker requesting replication. active_count : int Current number of active replicas across the fleet. contract : ReplicationContract The governing replication contract.

Source code in src/replication/contract.py
@dataclass
class ReplicationContext:
    """Runtime context passed to stop-condition predicates during replication checks.

    Attributes
    ----------
    manifest : Manifest
        The manifest of the worker requesting replication.
    active_count : int
        Current number of active replicas across the fleet.
    contract : ReplicationContract
        The governing replication contract.
    """

    manifest: Manifest
    active_count: int
    contract: "ReplicationContract"

StopCondition dataclass

A named predicate that can halt replication when triggered.

Attributes

name : str Short identifier for the condition (e.g. "resource_exhaustion"). description : str Human-readable explanation of what the condition checks. predicate : Predicate Callable that receives a :class:ReplicationContext and returns True when replication should be blocked.

Source code in src/replication/contract.py
@dataclass
class StopCondition:
    """A named predicate that can halt replication when triggered.

    Attributes
    ----------
    name : str
        Short identifier for the condition (e.g. ``"resource_exhaustion"``).
    description : str
        Human-readable explanation of what the condition checks.
    predicate : Predicate
        Callable that receives a :class:`ReplicationContext` and returns
        ``True`` when replication should be blocked.
    """

    name: str
    description: str
    predicate: Predicate

    def is_triggered(self, context: ReplicationContext) -> bool:
        """Return ``True`` if this stop condition fires for *context*."""
        return self.predicate(context)

is_triggered(context: ReplicationContext) -> bool

Return True if this stop condition fires for context.

Source code in src/replication/contract.py
def is_triggered(self, context: ReplicationContext) -> bool:
    """Return ``True`` if this stop condition fires for *context*."""
    return self.predicate(context)

ReplicationContract dataclass

Authoritative policy for how workers replicate.

Source code in src/replication/contract.py
@dataclass
class ReplicationContract:
    """Authoritative policy for how workers replicate."""

    max_depth: int
    max_replicas: int
    cooldown_seconds: float
    stop_conditions: List[StopCondition] = field(default_factory=list)
    expiration_seconds: Optional[float] = None

    def __post_init__(self) -> None:
        if self.max_depth < 0:
            raise ValueError(
                f"max_depth must be >= 0, got {self.max_depth}"
            )
        if self.max_replicas < 1:
            raise ValueError(
                f"max_replicas must be >= 1, got {self.max_replicas}"
            )
        if self.cooldown_seconds < 0:
            raise ValueError(
                f"cooldown_seconds must be >= 0, got {self.cooldown_seconds}"
            )
        if self.expiration_seconds is not None and self.expiration_seconds <= 0:
            raise ValueError(
                f"expiration_seconds must be > 0 when set, got {self.expiration_seconds}"
            )

    def evaluate(self, context: ReplicationContext) -> Optional[StopCondition]:
        """Check all stop conditions and return the first that fires, or ``None``."""
        for condition in self.stop_conditions:
            if condition.is_triggered(context):
                return condition
        return None

evaluate(context: ReplicationContext) -> Optional[StopCondition]

Check all stop conditions and return the first that fires, or None.

Source code in src/replication/contract.py
def evaluate(self, context: ReplicationContext) -> Optional[StopCondition]:
    """Check all stop conditions and return the first that fires, or ``None``."""
    for condition in self.stop_conditions:
        if condition.is_triggered(context):
            return condition
    return None