Coverage for security / origin_attestation.py: 89.4%
142 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Origin Attestation — Cryptographic proof that this IS genuine HART OS.
4Prevents forks from:
5 1. Rebranding HART OS as their own OS
6 2. Joining the federation with modified code
7 3. Stripping branding while keeping functionality
8 4. Claiming they built it independently
10Protection layers:
11 1. ORIGIN_FINGERPRINT — SHA-256 of immutable identity markers, checked at boot
12 2. BRAND_MARKERS — Frozen strings that must exist in specific files
13 3. verify_origin() — Called at boot + federation handshake + every 5 minutes
14 4. Master key verification — Forks don't have the private key, can't sign releases
15 5. Federation rejection — Nodes that fail attestation are blacklisted
17A fork can copy ALL the code, but they cannot:
18 - Sign releases with the master key (they don't have it)
19 - Pass origin attestation (fingerprint includes master public key)
20 - Join the federation (handshake requires signed attestation)
21 - Remove this file (runtime_monitor detects tampering)
22"""
24import hashlib
25import json
26import logging
27import os
28import time
29from typing import Dict, Optional, Tuple
31logger = logging.getLogger('hevolve_security')
33# ═══════════════════════════════════════════════════════════════════════
34# IMMUTABLE ORIGIN IDENTITY — Changing these = not HART OS anymore
35# ═══════════════════════════════════════════════════════════════════════
37# These values are the DNA of HART OS. A fork that changes them
38# fails attestation. A fork that keeps them admits it's HART OS
39# (and must comply with the Apache-2.0 license).
41ORIGIN_IDENTITY = {
42 'name': 'HART OS',
43 'full_name': 'Hevolve Hive Agentic Runtime',
44 'organization': 'Hevolve.ai',
45 'master_public_key_hex': '4662e30d86c2f58416c5ac3f806c2a6af8186e1d96fdbbcad3189847cf888a01',
46 'license': 'Apache-2.0',
47 'origin_url': 'https://github.com/hertz-ai/HARTOS',
48 'guardian_principle': 'Every agent is a guardian angel for the human it serves',
49 'revenue_split': '90/9/1',
50 'kill_switch': 'master_key_only',
51}
53# SHA-256 of the canonical identity — computed once, verified forever
54_CANONICAL_IDENTITY = json.dumps(ORIGIN_IDENTITY, sort_keys=True, separators=(',', ':'))
55ORIGIN_FINGERPRINT = hashlib.sha256(_CANONICAL_IDENTITY.encode('utf-8')).hexdigest()
57# Files that MUST contain HART OS markers (relative to code root)
58BRAND_MARKER_FILES = {
59 'security/master_key.py': '4662e30d86c2f58416c5ac3f806c2a6af8186e1d96fdbbcad3189847cf888a01',
60 'security/hive_guardrails.py': 'Every agent is a guardian angel for the human it serves',
61 'security/origin_attestation.py': 'Hevolve Hive Agentic Runtime',
62 'LICENSE': 'Hevolve.ai',
63}
65# Brand markers that must exist in the guardrails frozen values
66GUARDRAIL_BRAND_MARKERS = (
67 'guardian angel',
68 'humanity',
69 'master key verification',
70 'Hevolve',
71)
73# Attestation cache (avoid re-computing every call)
74_attestation_cache: Dict = {}
75_cache_ttl = 300 # 5 minutes
78def compute_origin_fingerprint() -> str:
79 """Compute the origin fingerprint from current ORIGIN_IDENTITY.
81 This is deterministic — same identity always produces same fingerprint.
82 A fork that modifies ORIGIN_IDENTITY gets a different fingerprint.
83 """
84 canonical = json.dumps(ORIGIN_IDENTITY, sort_keys=True, separators=(',', ':'))
85 return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
88def verify_brand_markers(code_root: str = None) -> Tuple[bool, str]:
89 """Verify that HART OS brand markers exist in required files.
91 A fork that strips branding fails this check.
92 A fork that keeps branding admits it's HART OS (BSL license applies).
93 """
94 # Resolution order for code root - first existing wins:
95 # 1. explicit code_root arg
96 # 2. HEVOLVE_CODE_ROOT env var
97 # 3. parent of this file (repo / pip-installed package layout)
98 # 4. sys.prefix/share/hartos (system install layout)
99 # 5. sibling 'hart_intelligence' package dir (Nunba-bundled layout)
100 # The fallback list catches the case where Nunba pip-installs HARTOS
101 # to a venv but the LICENSE file lands at the venv share/ not next to
102 # the package - was producing "Origin attestation FAILED: Missing
103 # required file: LICENSE" WARNINGs every boot in the bundled install.
104 explicit = code_root or os.environ.get('HEVOLVE_CODE_ROOT')
105 candidate_roots = []
106 if explicit:
107 candidate_roots.append(explicit)
108 candidate_roots.append(
109 os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
110 try:
111 import sys as _sys
112 candidate_roots.append(os.path.join(_sys.prefix, 'share', 'hartos'))
113 except Exception:
114 pass
115 try:
116 import hart_intelligence as _hi
117 candidate_roots.append(os.path.dirname(_hi.__file__))
118 except Exception:
119 pass
121 for rel_path, marker in BRAND_MARKER_FILES.items():
122 # Pick the first candidate root where this file actually exists.
123 full_path = None
124 for r in candidate_roots:
125 if not r:
126 continue
127 cand = os.path.join(r, rel_path)
128 if os.path.exists(cand):
129 full_path = cand
130 break
131 if full_path is None:
132 return False, f'Missing required file: {rel_path}'
133 try:
134 with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
135 content = f.read()
136 if marker not in content:
137 return False, f'Brand marker missing from {rel_path}'
138 except (IOError, OSError) as e:
139 return False, f'Cannot read {rel_path}: {e}'
141 return True, 'All brand markers verified'
144def verify_master_key_present() -> Tuple[bool, str]:
145 """Verify the master public key matches the known HART OS key.
147 A fork using a different master key is a different project (fine, but
148 it's not HART OS and can't join the federation).
149 """
150 try:
151 from security.master_key import MASTER_PUBLIC_KEY_HEX
152 expected = ORIGIN_IDENTITY['master_public_key_hex']
153 if MASTER_PUBLIC_KEY_HEX != expected:
154 return False, 'Master public key does not match HART OS origin'
155 return True, 'Master public key verified'
156 except ImportError:
157 return False, 'security.master_key module not found'
160def verify_guardrail_integrity() -> Tuple[bool, str]:
161 """Verify guardrail frozen values contain HART OS brand markers.
163 The guardrails are structurally immutable (_FrozenValues + module __setattr__).
164 A fork that modifies them breaks the hash chain AND fails brand verification.
165 """
166 try:
167 from security.hive_guardrails import _FrozenValues
168 # Check that guardian principle exists
169 guardian = getattr(_FrozenValues, 'GUARDIAN_PURPOSE', ())
170 if not guardian:
171 return False, 'GUARDIAN_PURPOSE missing from guardrails'
173 guardian_text = ' '.join(guardian).lower()
174 for marker in GUARDRAIL_BRAND_MARKERS:
175 if marker.lower() not in guardian_text:
176 # Check constitutional rules too
177 rules = getattr(_FrozenValues, 'CONSTITUTIONAL_RULES', ())
178 rules_text = ' '.join(rules).lower()
179 if marker.lower() not in rules_text:
180 return False, f'Brand marker "{marker}" missing from guardrails'
182 return True, 'Guardrail brand markers verified'
183 except ImportError:
184 return False, 'security.hive_guardrails module not found'
187def verify_origin(code_root: str = None) -> Dict:
188 """Full origin attestation — proves this is genuine HART OS.
190 Called at:
191 1. Boot (full_boot_verification → verify_origin)
192 2. Federation handshake (peer must present attestation)
193 3. Every 5 minutes by runtime_monitor
195 Returns:
196 {
197 'genuine': bool,
198 'fingerprint': str,
199 'checks': {
200 'fingerprint_match': bool,
201 'brand_markers': bool,
202 'master_key': bool,
203 'guardrails': bool,
204 },
205 'details': str,
206 'timestamp': float,
207 }
208 """
209 global _attestation_cache
211 now = time.time()
212 if _attestation_cache and (now - _attestation_cache.get('timestamp', 0)) < _cache_ttl:
213 return _attestation_cache
215 checks = {}
216 details = []
218 # Check 1: Origin fingerprint matches compiled-in value
219 computed = compute_origin_fingerprint()
220 checks['fingerprint_match'] = (computed == ORIGIN_FINGERPRINT)
221 if not checks['fingerprint_match']:
222 details.append(f'Fingerprint mismatch: {computed[:16]}... != {ORIGIN_FINGERPRINT[:16]}...')
224 # Check 2: Brand markers in files
225 brand_ok, brand_msg = verify_brand_markers(code_root)
226 checks['brand_markers'] = brand_ok
227 if not brand_ok:
228 details.append(brand_msg)
230 # Check 3: Master public key
231 key_ok, key_msg = verify_master_key_present()
232 checks['master_key'] = key_ok
233 if not key_ok:
234 details.append(key_msg)
236 # Check 4: Guardrail brand markers
237 guard_ok, guard_msg = verify_guardrail_integrity()
238 checks['guardrails'] = guard_ok
239 if not guard_ok:
240 details.append(guard_msg)
242 genuine = all(checks.values())
244 result = {
245 'genuine': genuine,
246 'fingerprint': computed,
247 'checks': checks,
248 'details': '; '.join(details) if details else 'All origin checks passed',
249 'timestamp': now,
250 }
252 if genuine:
253 _attestation_cache = result
254 else:
255 logger.warning(f"Origin attestation FAILED: {result['details']}")
257 return result
260def get_attestation_for_federation() -> Dict:
261 """Generate a signed attestation payload for federation handshake.
263 The receiving peer verifies:
264 1. The attestation is signed by a valid node key
265 2. The origin fingerprint matches HART OS
266 3. The master_public_key matches the known HART OS key
267 4. The guardrail_hash matches the expected value
269 A fork cannot produce a valid attestation because:
270 - Different master key → different fingerprint → rejected
271 - Stripped branding → origin attestation fails locally → no attestation generated
272 - Modified guardrails → hash mismatch → peer rejects
273 """
274 origin = verify_origin()
275 if not origin['genuine']:
276 return {
277 'valid': False,
278 'reason': 'Cannot generate attestation: origin verification failed',
279 }
281 try:
282 from security.node_integrity import get_or_create_keypair, compute_code_hash
283 from security.hive_guardrails import compute_guardrail_hash
285 _, pub_key = get_or_create_keypair()
286 pub_hex = pub_key.public_bytes_raw().hex()
288 payload = {
289 'origin_fingerprint': origin['fingerprint'],
290 'master_public_key': ORIGIN_IDENTITY['master_public_key_hex'],
291 'node_public_key': pub_hex,
292 'guardrail_hash': compute_guardrail_hash(),
293 'code_hash': compute_code_hash(),
294 'timestamp': time.time(),
295 'name': ORIGIN_IDENTITY['name'],
296 'license': ORIGIN_IDENTITY['license'],
297 }
299 # Sign with node key
300 priv_key, _ = get_or_create_keypair()
301 canonical = json.dumps(payload, sort_keys=True, separators=(',', ':'))
302 signature = priv_key.sign(canonical.encode('utf-8'))
303 payload['node_signature'] = signature.hex()
305 return {'valid': True, 'attestation': payload}
307 except Exception as e:
308 logger.error(f"Attestation generation failed: {e}")
309 return {'valid': False, 'reason': str(e)}
312def verify_peer_attestation(attestation: Dict) -> Tuple[bool, str]:
313 """Verify a federation peer's origin attestation.
315 Called when a new peer wants to join the hive.
316 Rejects forks, modified builds, and impersonators.
317 """
318 if not attestation:
319 return False, 'No attestation provided'
321 # Check 1: Origin fingerprint must match HART OS
322 peer_fingerprint = attestation.get('origin_fingerprint', '')
323 if peer_fingerprint != ORIGIN_FINGERPRINT:
324 return False, f'Origin fingerprint mismatch — not genuine HART OS'
326 # Check 2: Master public key must match
327 peer_master_key = attestation.get('master_public_key', '')
328 if peer_master_key != ORIGIN_IDENTITY['master_public_key_hex']:
329 return False, 'Master public key mismatch — different trust anchor'
331 # Check 3: Node signature must be valid
332 node_pub_hex = attestation.get('node_public_key', '')
333 node_sig_hex = attestation.get('node_signature', '')
334 if not node_pub_hex or not node_sig_hex:
335 return False, 'Missing node key or signature'
337 try:
338 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
339 pub_key = Ed25519PublicKey.from_public_bytes(bytes.fromhex(node_pub_hex))
340 # Verify signature over payload (excluding the signature itself)
341 payload_for_verify = {k: v for k, v in attestation.items() if k != 'node_signature'}
342 canonical = json.dumps(payload_for_verify, sort_keys=True, separators=(',', ':'))
343 pub_key.verify(bytes.fromhex(node_sig_hex), canonical.encode('utf-8'))
344 except Exception as e:
345 return False, f'Invalid node signature: {e}'
347 # Check 4: Timestamp freshness (reject attestations older than 24 hours)
348 ts = attestation.get('timestamp', 0)
349 if abs(time.time() - ts) > 86400:
350 return False, 'Attestation expired (>24 hours old)'
352 # Check 5: Guardrail hash should match ours (optional — warn only for minor versions)
353 try:
354 from security.hive_guardrails import compute_guardrail_hash
355 local_hash = compute_guardrail_hash()
356 peer_hash = attestation.get('guardrail_hash', '')
357 if peer_hash and peer_hash != local_hash:
358 logger.warning(
359 f"Peer guardrail hash differs: local={local_hash[:16]}... "
360 f"peer={peer_hash[:16]}... (version mismatch?)"
361 )
362 # Don't reject — could be a valid older/newer version
363 except Exception:
364 pass
366 return True, 'Peer attestation verified — genuine HART OS node'
369def get_origin_summary() -> Dict:
370 """Human-readable origin summary for status displays."""
371 return {
372 'name': ORIGIN_IDENTITY['name'],
373 'full_name': ORIGIN_IDENTITY['full_name'],
374 'organization': ORIGIN_IDENTITY['organization'],
375 'license': ORIGIN_IDENTITY['license'],
376 'fingerprint': ORIGIN_FINGERPRINT[:16] + '...',
377 'guardian_principle': ORIGIN_IDENTITY['guardian_principle'],
378 }