Coverage for security / origin_attestation.py: 89.4%

142 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Origin Attestation — Cryptographic proof that this IS genuine HART OS. 

3 

4Prevents forks from: 

5 1. Rebranding HART OS as their own OS 

6 2. Joining the federation with modified code 

7 3. Stripping branding while keeping functionality 

8 4. Claiming they built it independently 

9 

10Protection layers: 

11 1. ORIGIN_FINGERPRINT — SHA-256 of immutable identity markers, checked at boot 

12 2. BRAND_MARKERS — Frozen strings that must exist in specific files 

13 3. verify_origin() — Called at boot + federation handshake + every 5 minutes 

14 4. Master key verification — Forks don't have the private key, can't sign releases 

15 5. Federation rejection — Nodes that fail attestation are blacklisted 

16 

17A fork can copy ALL the code, but they cannot: 

18 - Sign releases with the master key (they don't have it) 

19 - Pass origin attestation (fingerprint includes master public key) 

20 - Join the federation (handshake requires signed attestation) 

21 - Remove this file (runtime_monitor detects tampering) 

22""" 

23 

24import hashlib 

25import json 

26import logging 

27import os 

28import time 

29from typing import Dict, Optional, Tuple 

30 

31logger = logging.getLogger('hevolve_security') 

32 

33# ═══════════════════════════════════════════════════════════════════════ 

34# IMMUTABLE ORIGIN IDENTITY — Changing these = not HART OS anymore 

35# ═══════════════════════════════════════════════════════════════════════ 

36 

37# These values are the DNA of HART OS. A fork that changes them 

38# fails attestation. A fork that keeps them admits it's HART OS 

39# (and must comply with the Apache-2.0 license). 

40 

41ORIGIN_IDENTITY = { 

42 'name': 'HART OS', 

43 'full_name': 'Hevolve Hive Agentic Runtime', 

44 'organization': 'Hevolve.ai', 

45 'master_public_key_hex': '4662e30d86c2f58416c5ac3f806c2a6af8186e1d96fdbbcad3189847cf888a01', 

46 'license': 'Apache-2.0', 

47 'origin_url': 'https://github.com/hertz-ai/HARTOS', 

48 'guardian_principle': 'Every agent is a guardian angel for the human it serves', 

49 'revenue_split': '90/9/1', 

50 'kill_switch': 'master_key_only', 

51} 

52 

53# SHA-256 of the canonical identity — computed once, verified forever 

54_CANONICAL_IDENTITY = json.dumps(ORIGIN_IDENTITY, sort_keys=True, separators=(',', ':')) 

55ORIGIN_FINGERPRINT = hashlib.sha256(_CANONICAL_IDENTITY.encode('utf-8')).hexdigest() 

56 

57# Files that MUST contain HART OS markers (relative to code root) 

58BRAND_MARKER_FILES = { 

59 'security/master_key.py': '4662e30d86c2f58416c5ac3f806c2a6af8186e1d96fdbbcad3189847cf888a01', 

60 'security/hive_guardrails.py': 'Every agent is a guardian angel for the human it serves', 

61 'security/origin_attestation.py': 'Hevolve Hive Agentic Runtime', 

62 'LICENSE': 'Hevolve.ai', 

63} 

64 

65# Brand markers that must exist in the guardrails frozen values 

66GUARDRAIL_BRAND_MARKERS = ( 

67 'guardian angel', 

68 'humanity', 

69 'master key verification', 

70 'Hevolve', 

71) 

72 

73# Attestation cache (avoid re-computing every call) 

74_attestation_cache: Dict = {} 

75_cache_ttl = 300 # 5 minutes 

76 

77 

78def compute_origin_fingerprint() -> str: 

79 """Compute the origin fingerprint from current ORIGIN_IDENTITY. 

80 

81 This is deterministic — same identity always produces same fingerprint. 

82 A fork that modifies ORIGIN_IDENTITY gets a different fingerprint. 

83 """ 

84 canonical = json.dumps(ORIGIN_IDENTITY, sort_keys=True, separators=(',', ':')) 

85 return hashlib.sha256(canonical.encode('utf-8')).hexdigest() 

86 

87 

88def verify_brand_markers(code_root: str = None) -> Tuple[bool, str]: 

89 """Verify that HART OS brand markers exist in required files. 

90 

91 A fork that strips branding fails this check. 

92 A fork that keeps branding admits it's HART OS (BSL license applies). 

93 """ 

94 # Resolution order for code root - first existing wins: 

95 # 1. explicit code_root arg 

96 # 2. HEVOLVE_CODE_ROOT env var 

97 # 3. parent of this file (repo / pip-installed package layout) 

98 # 4. sys.prefix/share/hartos (system install layout) 

99 # 5. sibling 'hart_intelligence' package dir (Nunba-bundled layout) 

100 # The fallback list catches the case where Nunba pip-installs HARTOS 

101 # to a venv but the LICENSE file lands at the venv share/ not next to 

102 # the package - was producing "Origin attestation FAILED: Missing 

103 # required file: LICENSE" WARNINGs every boot in the bundled install. 

104 explicit = code_root or os.environ.get('HEVOLVE_CODE_ROOT') 

105 candidate_roots = [] 

106 if explicit: 

107 candidate_roots.append(explicit) 

108 candidate_roots.append( 

109 os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 

110 try: 

111 import sys as _sys 

112 candidate_roots.append(os.path.join(_sys.prefix, 'share', 'hartos')) 

113 except Exception: 

114 pass 

115 try: 

116 import hart_intelligence as _hi 

117 candidate_roots.append(os.path.dirname(_hi.__file__)) 

118 except Exception: 

119 pass 

120 

121 for rel_path, marker in BRAND_MARKER_FILES.items(): 

122 # Pick the first candidate root where this file actually exists. 

123 full_path = None 

124 for r in candidate_roots: 

125 if not r: 

126 continue 

127 cand = os.path.join(r, rel_path) 

128 if os.path.exists(cand): 

129 full_path = cand 

130 break 

131 if full_path is None: 

132 return False, f'Missing required file: {rel_path}' 

133 try: 

134 with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: 

135 content = f.read() 

136 if marker not in content: 

137 return False, f'Brand marker missing from {rel_path}' 

138 except (IOError, OSError) as e: 

139 return False, f'Cannot read {rel_path}: {e}' 

140 

141 return True, 'All brand markers verified' 

142 

143 

144def verify_master_key_present() -> Tuple[bool, str]: 

145 """Verify the master public key matches the known HART OS key. 

146 

147 A fork using a different master key is a different project (fine, but 

148 it's not HART OS and can't join the federation). 

149 """ 

150 try: 

151 from security.master_key import MASTER_PUBLIC_KEY_HEX 

152 expected = ORIGIN_IDENTITY['master_public_key_hex'] 

153 if MASTER_PUBLIC_KEY_HEX != expected: 

154 return False, 'Master public key does not match HART OS origin' 

155 return True, 'Master public key verified' 

156 except ImportError: 

157 return False, 'security.master_key module not found' 

158 

159 

160def verify_guardrail_integrity() -> Tuple[bool, str]: 

161 """Verify guardrail frozen values contain HART OS brand markers. 

162 

163 The guardrails are structurally immutable (_FrozenValues + module __setattr__). 

164 A fork that modifies them breaks the hash chain AND fails brand verification. 

165 """ 

166 try: 

167 from security.hive_guardrails import _FrozenValues 

168 # Check that guardian principle exists 

169 guardian = getattr(_FrozenValues, 'GUARDIAN_PURPOSE', ()) 

170 if not guardian: 

171 return False, 'GUARDIAN_PURPOSE missing from guardrails' 

172 

173 guardian_text = ' '.join(guardian).lower() 

174 for marker in GUARDRAIL_BRAND_MARKERS: 

175 if marker.lower() not in guardian_text: 

176 # Check constitutional rules too 

177 rules = getattr(_FrozenValues, 'CONSTITUTIONAL_RULES', ()) 

178 rules_text = ' '.join(rules).lower() 

179 if marker.lower() not in rules_text: 

180 return False, f'Brand marker "{marker}" missing from guardrails' 

181 

182 return True, 'Guardrail brand markers verified' 

183 except ImportError: 

184 return False, 'security.hive_guardrails module not found' 

185 

186 

187def verify_origin(code_root: str = None) -> Dict: 

188 """Full origin attestation — proves this is genuine HART OS. 

189 

190 Called at: 

191 1. Boot (full_boot_verification → verify_origin) 

192 2. Federation handshake (peer must present attestation) 

193 3. Every 5 minutes by runtime_monitor 

194 

195 Returns: 

196 { 

197 'genuine': bool, 

198 'fingerprint': str, 

199 'checks': { 

200 'fingerprint_match': bool, 

201 'brand_markers': bool, 

202 'master_key': bool, 

203 'guardrails': bool, 

204 }, 

205 'details': str, 

206 'timestamp': float, 

207 } 

208 """ 

209 global _attestation_cache 

210 

211 now = time.time() 

212 if _attestation_cache and (now - _attestation_cache.get('timestamp', 0)) < _cache_ttl: 

213 return _attestation_cache 

214 

215 checks = {} 

216 details = [] 

217 

218 # Check 1: Origin fingerprint matches compiled-in value 

219 computed = compute_origin_fingerprint() 

220 checks['fingerprint_match'] = (computed == ORIGIN_FINGERPRINT) 

221 if not checks['fingerprint_match']: 

222 details.append(f'Fingerprint mismatch: {computed[:16]}... != {ORIGIN_FINGERPRINT[:16]}...') 

223 

224 # Check 2: Brand markers in files 

225 brand_ok, brand_msg = verify_brand_markers(code_root) 

226 checks['brand_markers'] = brand_ok 

227 if not brand_ok: 

228 details.append(brand_msg) 

229 

230 # Check 3: Master public key 

231 key_ok, key_msg = verify_master_key_present() 

232 checks['master_key'] = key_ok 

233 if not key_ok: 

234 details.append(key_msg) 

235 

236 # Check 4: Guardrail brand markers 

237 guard_ok, guard_msg = verify_guardrail_integrity() 

238 checks['guardrails'] = guard_ok 

239 if not guard_ok: 

240 details.append(guard_msg) 

241 

242 genuine = all(checks.values()) 

243 

244 result = { 

245 'genuine': genuine, 

246 'fingerprint': computed, 

247 'checks': checks, 

248 'details': '; '.join(details) if details else 'All origin checks passed', 

249 'timestamp': now, 

250 } 

251 

252 if genuine: 

253 _attestation_cache = result 

254 else: 

255 logger.warning(f"Origin attestation FAILED: {result['details']}") 

256 

257 return result 

258 

259 

260def get_attestation_for_federation() -> Dict: 

261 """Generate a signed attestation payload for federation handshake. 

262 

263 The receiving peer verifies: 

264 1. The attestation is signed by a valid node key 

265 2. The origin fingerprint matches HART OS 

266 3. The master_public_key matches the known HART OS key 

267 4. The guardrail_hash matches the expected value 

268 

269 A fork cannot produce a valid attestation because: 

270 - Different master key → different fingerprint → rejected 

271 - Stripped branding → origin attestation fails locally → no attestation generated 

272 - Modified guardrails → hash mismatch → peer rejects 

273 """ 

274 origin = verify_origin() 

275 if not origin['genuine']: 

276 return { 

277 'valid': False, 

278 'reason': 'Cannot generate attestation: origin verification failed', 

279 } 

280 

281 try: 

282 from security.node_integrity import get_or_create_keypair, compute_code_hash 

283 from security.hive_guardrails import compute_guardrail_hash 

284 

285 _, pub_key = get_or_create_keypair() 

286 pub_hex = pub_key.public_bytes_raw().hex() 

287 

288 payload = { 

289 'origin_fingerprint': origin['fingerprint'], 

290 'master_public_key': ORIGIN_IDENTITY['master_public_key_hex'], 

291 'node_public_key': pub_hex, 

292 'guardrail_hash': compute_guardrail_hash(), 

293 'code_hash': compute_code_hash(), 

294 'timestamp': time.time(), 

295 'name': ORIGIN_IDENTITY['name'], 

296 'license': ORIGIN_IDENTITY['license'], 

297 } 

298 

299 # Sign with node key 

300 priv_key, _ = get_or_create_keypair() 

301 canonical = json.dumps(payload, sort_keys=True, separators=(',', ':')) 

302 signature = priv_key.sign(canonical.encode('utf-8')) 

303 payload['node_signature'] = signature.hex() 

304 

305 return {'valid': True, 'attestation': payload} 

306 

307 except Exception as e: 

308 logger.error(f"Attestation generation failed: {e}") 

309 return {'valid': False, 'reason': str(e)} 

310 

311 

312def verify_peer_attestation(attestation: Dict) -> Tuple[bool, str]: 

313 """Verify a federation peer's origin attestation. 

314 

315 Called when a new peer wants to join the hive. 

316 Rejects forks, modified builds, and impersonators. 

317 """ 

318 if not attestation: 

319 return False, 'No attestation provided' 

320 

321 # Check 1: Origin fingerprint must match HART OS 

322 peer_fingerprint = attestation.get('origin_fingerprint', '') 

323 if peer_fingerprint != ORIGIN_FINGERPRINT: 

324 return False, f'Origin fingerprint mismatch — not genuine HART OS' 

325 

326 # Check 2: Master public key must match 

327 peer_master_key = attestation.get('master_public_key', '') 

328 if peer_master_key != ORIGIN_IDENTITY['master_public_key_hex']: 

329 return False, 'Master public key mismatch — different trust anchor' 

330 

331 # Check 3: Node signature must be valid 

332 node_pub_hex = attestation.get('node_public_key', '') 

333 node_sig_hex = attestation.get('node_signature', '') 

334 if not node_pub_hex or not node_sig_hex: 

335 return False, 'Missing node key or signature' 

336 

337 try: 

338 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey 

339 pub_key = Ed25519PublicKey.from_public_bytes(bytes.fromhex(node_pub_hex)) 

340 # Verify signature over payload (excluding the signature itself) 

341 payload_for_verify = {k: v for k, v in attestation.items() if k != 'node_signature'} 

342 canonical = json.dumps(payload_for_verify, sort_keys=True, separators=(',', ':')) 

343 pub_key.verify(bytes.fromhex(node_sig_hex), canonical.encode('utf-8')) 

344 except Exception as e: 

345 return False, f'Invalid node signature: {e}' 

346 

347 # Check 4: Timestamp freshness (reject attestations older than 24 hours) 

348 ts = attestation.get('timestamp', 0) 

349 if abs(time.time() - ts) > 86400: 

350 return False, 'Attestation expired (>24 hours old)' 

351 

352 # Check 5: Guardrail hash should match ours (optional — warn only for minor versions) 

353 try: 

354 from security.hive_guardrails import compute_guardrail_hash 

355 local_hash = compute_guardrail_hash() 

356 peer_hash = attestation.get('guardrail_hash', '') 

357 if peer_hash and peer_hash != local_hash: 

358 logger.warning( 

359 f"Peer guardrail hash differs: local={local_hash[:16]}... " 

360 f"peer={peer_hash[:16]}... (version mismatch?)" 

361 ) 

362 # Don't reject — could be a valid older/newer version 

363 except Exception: 

364 pass 

365 

366 return True, 'Peer attestation verified — genuine HART OS node' 

367 

368 

369def get_origin_summary() -> Dict: 

370 """Human-readable origin summary for status displays.""" 

371 return { 

372 'name': ORIGIN_IDENTITY['name'], 

373 'full_name': ORIGIN_IDENTITY['full_name'], 

374 'organization': ORIGIN_IDENTITY['organization'], 

375 'license': ORIGIN_IDENTITY['license'], 

376 'fingerprint': ORIGIN_FINGERPRINT[:16] + '...', 

377 'guardian_principle': ORIGIN_IDENTITY['guardian_principle'], 

378 }