Coverage for integrations / robotics / hevolveai_access_gate.py: 91.5%
59 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveAI Access Gate — Feature-level access control.
4Gates specific HevolveAI features by:
5 1. Certificate tier (central, regional, local)
6 2. CCT capability (e.g. 'embodied_ai', 'sensor_fusion')
7 3. Installation integrity (verified manifest)
9Features that require gating:
10 - 'in_process': Direct Python calls (requires integrity verification)
11 - 'sensor_fusion': Native sensor fusion pipeline
12 - 'navigation': Path planning + SLAM
13 - 'manipulation': Arm control + IK
14 - 'learning': Continuous learning pipeline
15 - 'hivemind': Distributed intelligence
17Reuses the existing PrivateRepoAccessService pattern.
18"""
19import logging
20import os
21from typing import Dict, Optional
23logger = logging.getLogger('hevolve_security')
26def check_hevolveai_access(feature: str) -> Dict:
27 """Check if this node can use a specific HevolveAI feature.
29 Args:
30 feature: Feature name (e.g. 'in_process', 'sensor_fusion')
32 Returns:
33 {
34 'allowed': bool,
35 'reason': str,
36 'tier': str,
37 'integrity_verified': bool,
38 }
39 """
40 result: Dict = {
41 'allowed': False,
42 'reason': '',
43 'tier': 'unknown',
44 'integrity_verified': False,
45 }
47 # 1. Check certificate tier
48 tier = _get_node_tier()
49 result['tier'] = tier
51 # 2. Check installation integrity
52 integrity = _check_integrity()
53 result['integrity_verified'] = integrity
55 # 3. Feature-specific rules
56 feature_rules = _FEATURE_RULES.get(feature)
57 if feature_rules is None:
58 result['allowed'] = True
59 result['reason'] = 'no restrictions on this feature'
60 return result
62 # Tier check
63 min_tier = feature_rules.get('min_tier', 'local')
64 if not _tier_meets_minimum(tier, min_tier):
65 result['reason'] = (
66 f"feature '{feature}' requires tier '{min_tier}' or above, "
67 f"current tier: '{tier}'"
68 )
69 return result
71 # Integrity check (some features require verified install)
72 if feature_rules.get('requires_integrity') and not integrity:
73 result['reason'] = (
74 f"feature '{feature}' requires verified HevolveAI installation"
75 )
76 return result
78 # CCT capability check
79 required_cct = feature_rules.get('required_cct')
80 if required_cct and not _has_cct_capability(required_cct):
81 result['reason'] = (
82 f"feature '{feature}' requires CCT capability: {required_cct}"
83 )
84 return result
86 result['allowed'] = True
87 result['reason'] = 'access granted'
88 return result
91# ── Feature Rules ────────────────────────────────────────────────
93_FEATURE_RULES: Dict[str, Dict] = {
94 'in_process': {
95 'min_tier': 'local',
96 'requires_integrity': True,
97 'required_cct': None,
98 },
99 'sensor_fusion': {
100 'min_tier': 'local',
101 'requires_integrity': False,
102 'required_cct': 'embodied_ai',
103 },
104 'navigation': {
105 'min_tier': 'local',
106 'requires_integrity': False,
107 'required_cct': 'embodied_ai',
108 },
109 'manipulation': {
110 'min_tier': 'local',
111 'requires_integrity': False,
112 'required_cct': 'embodied_ai',
113 },
114 'learning': {
115 'min_tier': 'local',
116 'requires_integrity': False,
117 'required_cct': 'learning',
118 },
119 'hivemind': {
120 'min_tier': 'regional',
121 'requires_integrity': True,
122 'required_cct': 'hivemind',
123 },
124}
126# Tier hierarchy (index = rank, higher = more privileged)
127_TIER_RANK = ['observer', 'local', 'regional', 'central']
130def _tier_meets_minimum(current: str, minimum: str) -> bool:
131 """Check if current tier meets minimum requirement."""
132 try:
133 current_rank = _TIER_RANK.index(current)
134 except ValueError:
135 current_rank = 1 # Default to 'local' if unknown
137 try:
138 min_rank = _TIER_RANK.index(minimum)
139 except ValueError:
140 min_rank = 1
142 return current_rank >= min_rank
145def _get_node_tier() -> str:
146 """Get this node's certificate tier."""
147 try:
148 from security.key_delegation import get_node_tier
149 return get_node_tier()
150 except Exception:
151 return 'local'
154def _check_integrity() -> bool:
155 """Check if HevolveAI installation passes integrity verification."""
156 try:
157 from security.source_protection import SourceProtectionService
158 result = SourceProtectionService.verify_hevolveai_integrity()
159 return result.get('verified', False)
160 except Exception:
161 return False
164def _has_cct_capability(capability: str) -> bool:
165 """Check if this node has a valid CCT with the required capability.
167 Fail-closed in production. Dev-mode escape: HEVOLVE_DEV_MODE=true.
168 """
169 try:
170 from integrations.agent_engine.continual_learner_gate import (
171 ContinualLearnerGate,
172 )
173 gate = ContinualLearnerGate()
174 status = gate.check_access()
175 if not status.get('has_valid_cct'):
176 return False
177 capabilities = status.get('capabilities', [])
178 return capability in capabilities
179 except Exception:
180 # Fail-closed in production; dev mode bypasses for test environments
181 if os.environ.get('HEVOLVE_DEV_MODE', '').lower() == 'true':
182 return True
183 return False