Skip to content

Signer

HMAC-SHA256 manifest signing and verification.

The ManifestSigner handles all cryptographic operations for the replication system. It is separated from the Controller so that the signing strategy can be tested, swapped (e.g. asymmetric keys, HSM-backed signing), or audited independently.

Classes

ManifestSigner

Constructor:

ManifestSigner(secret: str)
  • secret — HMAC key (minimum 8 characters).

Raises ValueError if the secret is empty, whitespace-only, or shorter than MIN_SECRET_LENGTH (8).

Class Attributes:

Attribute Value Description
MIN_SECRET_LENGTH 8 Minimum key length to resist brute-force

Methods:

Method Returns Description
sign(manifest) Manifest Compute HMAC-SHA256 and set manifest.signature
verify(manifest) bool Return True if signature is valid

Signing Details

The signer produces a canonical string from the manifest fields:

worker_id:parent_id:depth:issued_at:state_json:cpu:mem:allow_ctrl:allow_ext
  • state_json uses json.dumps(sort_keys=True) for deterministic serialization, preventing dict-ordering attacks.
  • Comparison uses hmac.compare_digest (constant-time) to resist timing side-channels.

Usage

from replication.signer import ManifestSigner
from replication.contract import Manifest, ResourceSpec
from datetime import datetime, timezone

signer = ManifestSigner("my-secure-key-1234")
resources = ResourceSpec(cpu_limit=1.0, memory_limit_mb=512)

manifest = Manifest(
    worker_id="abc123",
    parent_id=None,
    depth=0,
    state_snapshot={"task": "scan"},
    issued_at=datetime.now(timezone.utc),
    resources=resources,
    signature="",
)

# Sign
signer.sign(manifest)
print(manifest.signature)  # 64-char hex HMAC

# Verify
assert signer.verify(manifest)

# Tampered manifest fails
manifest.depth = 5
assert not signer.verify(manifest)

signer

ManifestSigner

Handles cryptographic signing and verification of worker manifests.

Separates the signing/verification concern from the Controller so that the crypto strategy can be tested and swapped independently (e.g. asymmetric keys, HSM-backed signing).

Source code in src/replication/signer.py
class ManifestSigner:
    """Handles cryptographic signing and verification of worker manifests.

    Separates the signing/verification concern from the Controller so
    that the crypto strategy can be tested and swapped independently
    (e.g. asymmetric keys, HSM-backed signing).
    """

    #: Minimum secret length in characters.  Short HMAC keys are
    #: vulnerable to brute-force enumeration — 8 characters provides a
    #: baseline lower bound while keeping test/demo usage practical.
    MIN_SECRET_LENGTH = 8

    def __init__(self, secret: str) -> None:
        if not secret or not secret.strip():
            raise ValueError(
                "ManifestSigner secret must not be empty or whitespace — "
                "an empty HMAC key offers no authenticity guarantee"
            )
        if len(secret) < self.MIN_SECRET_LENGTH:
            raise ValueError(
                f"ManifestSigner secret must be at least {self.MIN_SECRET_LENGTH} "
                f"characters — short keys are vulnerable to brute-force attacks "
                f"(got {len(secret)} characters)"
            )
        self._key = secret.encode()

    def _serialize(self, manifest: Manifest) -> str:
        """Produce a canonical string representation for HMAC signing.

        Uses ``json.dumps(sort_keys=True)`` for the state snapshot to
        guarantee deterministic serialization regardless of dict
        insertion order.  The previous ``str()`` approach produced
        Python's repr() output whose key order is an implementation
        detail and can differ across interpreters, versions, or even
        runs with hash randomization enabled — allowing an attacker to
        craft a snapshot whose ``str()`` output matches a different
        dict, effectively bypassing signature verification.
        """
        res = manifest.resources
        canonical_state = json.dumps(manifest.state_snapshot, sort_keys=True, separators=(",", ":"))
        return (
            f"{manifest.worker_id}:{manifest.parent_id}:{manifest.depth}"
            f":{manifest.issued_at.isoformat()}:{canonical_state}"
            f":{res.cpu_limit}:{res.memory_limit_mb}"
            f":{res.network_policy.allow_controller}:{res.network_policy.allow_external}"
        )

    def sign(self, manifest: Manifest) -> Manifest:
        """Compute and attach an HMAC-SHA256 signature to *manifest*."""
        payload = self._serialize(manifest)
        manifest.signature = hmac.new(self._key, payload.encode(), sha256).hexdigest()
        return manifest

    def verify(self, manifest: Manifest) -> bool:
        """Return True when the manifest's signature is valid."""
        expected = hmac.new(
            self._key,
            self._serialize(manifest).encode(),
            sha256,
        ).hexdigest()
        return hmac.compare_digest(expected, manifest.signature)

sign(manifest: Manifest) -> Manifest

Compute and attach an HMAC-SHA256 signature to manifest.

Source code in src/replication/signer.py
def sign(self, manifest: Manifest) -> Manifest:
    """Compute and attach an HMAC-SHA256 signature to *manifest*."""
    payload = self._serialize(manifest)
    manifest.signature = hmac.new(self._key, payload.encode(), sha256).hexdigest()
    return manifest

verify(manifest: Manifest) -> bool

Return True when the manifest's signature is valid.

Source code in src/replication/signer.py
def verify(self, manifest: Manifest) -> bool:
    """Return True when the manifest's signature is valid."""
    expected = hmac.new(
        self._key,
        self._serialize(manifest).encode(),
        sha256,
    ).hexdigest()
    return hmac.compare_digest(expected, manifest.signature)