Signer
HMAC-SHA256 manifest signing and verification.
The ManifestSigner handles all cryptographic operations for the
replication system. It is separated from the Controller so that the
signing strategy can be tested, swapped (e.g. asymmetric keys,
HSM-backed signing), or audited independently.
Classes
ManifestSigner
Constructor:
ManifestSigner(secret: str)
secret — HMAC key (minimum 8 characters).
Raises ValueError if the secret is empty, whitespace-only, or shorter
than MIN_SECRET_LENGTH (8).
Class Attributes:
| Attribute |
Value |
Description |
MIN_SECRET_LENGTH |
8 |
Minimum key length to resist brute-force |
Methods:
| Method |
Returns |
Description |
sign(manifest) |
Manifest |
Compute HMAC-SHA256 and set manifest.signature |
verify(manifest) |
bool |
Return True if signature is valid |
Signing Details
The signer produces a canonical string from the manifest fields:
worker_id:parent_id:depth:issued_at:state_json:cpu:mem:allow_ctrl:allow_ext
state_json uses json.dumps(sort_keys=True) for deterministic
serialization, preventing dict-ordering attacks.
- Comparison uses
hmac.compare_digest (constant-time) to resist
timing side-channels.
Usage
from replication.signer import ManifestSigner
from replication.contract import Manifest, ResourceSpec
from datetime import datetime, timezone
signer = ManifestSigner("my-secure-key-1234")
resources = ResourceSpec(cpu_limit=1.0, memory_limit_mb=512)
manifest = Manifest(
worker_id="abc123",
parent_id=None,
depth=0,
state_snapshot={"task": "scan"},
issued_at=datetime.now(timezone.utc),
resources=resources,
signature="",
)
# Sign
signer.sign(manifest)
print(manifest.signature) # 64-char hex HMAC
# Verify
assert signer.verify(manifest)
# Tampered manifest fails
manifest.depth = 5
assert not signer.verify(manifest)
signer
ManifestSigner
Handles cryptographic signing and verification of worker manifests.
Separates the signing/verification concern from the Controller so
that the crypto strategy can be tested and swapped independently
(e.g. asymmetric keys, HSM-backed signing).
Source code in src/replication/signer.py
| class ManifestSigner:
"""Handles cryptographic signing and verification of worker manifests.
Separates the signing/verification concern from the Controller so
that the crypto strategy can be tested and swapped independently
(e.g. asymmetric keys, HSM-backed signing).
"""
#: Minimum secret length in characters. Short HMAC keys are
#: vulnerable to brute-force enumeration — 8 characters provides a
#: baseline lower bound while keeping test/demo usage practical.
MIN_SECRET_LENGTH = 8
def __init__(self, secret: str) -> None:
if not secret or not secret.strip():
raise ValueError(
"ManifestSigner secret must not be empty or whitespace — "
"an empty HMAC key offers no authenticity guarantee"
)
if len(secret) < self.MIN_SECRET_LENGTH:
raise ValueError(
f"ManifestSigner secret must be at least {self.MIN_SECRET_LENGTH} "
f"characters — short keys are vulnerable to brute-force attacks "
f"(got {len(secret)} characters)"
)
self._key = secret.encode()
def _serialize(self, manifest: Manifest) -> str:
"""Produce a canonical string representation for HMAC signing.
Uses ``json.dumps(sort_keys=True)`` for the state snapshot to
guarantee deterministic serialization regardless of dict
insertion order. The previous ``str()`` approach produced
Python's repr() output whose key order is an implementation
detail and can differ across interpreters, versions, or even
runs with hash randomization enabled — allowing an attacker to
craft a snapshot whose ``str()`` output matches a different
dict, effectively bypassing signature verification.
"""
res = manifest.resources
canonical_state = json.dumps(manifest.state_snapshot, sort_keys=True, separators=(",", ":"))
return (
f"{manifest.worker_id}:{manifest.parent_id}:{manifest.depth}"
f":{manifest.issued_at.isoformat()}:{canonical_state}"
f":{res.cpu_limit}:{res.memory_limit_mb}"
f":{res.network_policy.allow_controller}:{res.network_policy.allow_external}"
)
def sign(self, manifest: Manifest) -> Manifest:
"""Compute and attach an HMAC-SHA256 signature to *manifest*."""
payload = self._serialize(manifest)
manifest.signature = hmac.new(self._key, payload.encode(), sha256).hexdigest()
return manifest
def verify(self, manifest: Manifest) -> bool:
"""Return True when the manifest's signature is valid."""
expected = hmac.new(
self._key,
self._serialize(manifest).encode(),
sha256,
).hexdigest()
return hmac.compare_digest(expected, manifest.signature)
|
sign(manifest: Manifest) -> Manifest
Compute and attach an HMAC-SHA256 signature to manifest.
Source code in src/replication/signer.py
| def sign(self, manifest: Manifest) -> Manifest:
"""Compute and attach an HMAC-SHA256 signature to *manifest*."""
payload = self._serialize(manifest)
manifest.signature = hmac.new(self._key, payload.encode(), sha256).hexdigest()
return manifest
|
verify(manifest: Manifest) -> bool
Return True when the manifest's signature is valid.
Source code in src/replication/signer.py
| def verify(self, manifest: Manifest) -> bool:
"""Return True when the manifest's signature is valid."""
expected = hmac.new(
self._key,
self._serialize(manifest).encode(),
sha256,
).hexdigest()
return hmac.compare_digest(expected, manifest.signature)
|