Coverage for integrations / robotics / hevolveai_access_gate.py: 91.5%

59 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveAI Access Gate — Feature-level access control. 

3 

4Gates specific HevolveAI features by: 

5 1. Certificate tier (central, regional, local) 

6 2. CCT capability (e.g. 'embodied_ai', 'sensor_fusion') 

7 3. Installation integrity (verified manifest) 

8 

9Features that require gating: 

10 - 'in_process': Direct Python calls (requires integrity verification) 

11 - 'sensor_fusion': Native sensor fusion pipeline 

12 - 'navigation': Path planning + SLAM 

13 - 'manipulation': Arm control + IK 

14 - 'learning': Continuous learning pipeline 

15 - 'hivemind': Distributed intelligence 

16 

17Reuses the existing PrivateRepoAccessService pattern. 

18""" 

19import logging 

20import os 

21from typing import Dict, Optional 

22 

23logger = logging.getLogger('hevolve_security') 

24 

25 

26def check_hevolveai_access(feature: str) -> Dict: 

27 """Check if this node can use a specific HevolveAI feature. 

28 

29 Args: 

30 feature: Feature name (e.g. 'in_process', 'sensor_fusion') 

31 

32 Returns: 

33 { 

34 'allowed': bool, 

35 'reason': str, 

36 'tier': str, 

37 'integrity_verified': bool, 

38 } 

39 """ 

40 result: Dict = { 

41 'allowed': False, 

42 'reason': '', 

43 'tier': 'unknown', 

44 'integrity_verified': False, 

45 } 

46 

47 # 1. Check certificate tier 

48 tier = _get_node_tier() 

49 result['tier'] = tier 

50 

51 # 2. Check installation integrity 

52 integrity = _check_integrity() 

53 result['integrity_verified'] = integrity 

54 

55 # 3. Feature-specific rules 

56 feature_rules = _FEATURE_RULES.get(feature) 

57 if feature_rules is None: 

58 result['allowed'] = True 

59 result['reason'] = 'no restrictions on this feature' 

60 return result 

61 

62 # Tier check 

63 min_tier = feature_rules.get('min_tier', 'local') 

64 if not _tier_meets_minimum(tier, min_tier): 

65 result['reason'] = ( 

66 f"feature '{feature}' requires tier '{min_tier}' or above, " 

67 f"current tier: '{tier}'" 

68 ) 

69 return result 

70 

71 # Integrity check (some features require verified install) 

72 if feature_rules.get('requires_integrity') and not integrity: 

73 result['reason'] = ( 

74 f"feature '{feature}' requires verified HevolveAI installation" 

75 ) 

76 return result 

77 

78 # CCT capability check 

79 required_cct = feature_rules.get('required_cct') 

80 if required_cct and not _has_cct_capability(required_cct): 

81 result['reason'] = ( 

82 f"feature '{feature}' requires CCT capability: {required_cct}" 

83 ) 

84 return result 

85 

86 result['allowed'] = True 

87 result['reason'] = 'access granted' 

88 return result 

89 

90 

91# ── Feature Rules ──────────────────────────────────────────────── 

92 

93_FEATURE_RULES: Dict[str, Dict] = { 

94 'in_process': { 

95 'min_tier': 'local', 

96 'requires_integrity': True, 

97 'required_cct': None, 

98 }, 

99 'sensor_fusion': { 

100 'min_tier': 'local', 

101 'requires_integrity': False, 

102 'required_cct': 'embodied_ai', 

103 }, 

104 'navigation': { 

105 'min_tier': 'local', 

106 'requires_integrity': False, 

107 'required_cct': 'embodied_ai', 

108 }, 

109 'manipulation': { 

110 'min_tier': 'local', 

111 'requires_integrity': False, 

112 'required_cct': 'embodied_ai', 

113 }, 

114 'learning': { 

115 'min_tier': 'local', 

116 'requires_integrity': False, 

117 'required_cct': 'learning', 

118 }, 

119 'hivemind': { 

120 'min_tier': 'regional', 

121 'requires_integrity': True, 

122 'required_cct': 'hivemind', 

123 }, 

124} 

125 

126# Tier hierarchy (index = rank, higher = more privileged) 

127_TIER_RANK = ['observer', 'local', 'regional', 'central'] 

128 

129 

130def _tier_meets_minimum(current: str, minimum: str) -> bool: 

131 """Check if current tier meets minimum requirement.""" 

132 try: 

133 current_rank = _TIER_RANK.index(current) 

134 except ValueError: 

135 current_rank = 1 # Default to 'local' if unknown 

136 

137 try: 

138 min_rank = _TIER_RANK.index(minimum) 

139 except ValueError: 

140 min_rank = 1 

141 

142 return current_rank >= min_rank 

143 

144 

145def _get_node_tier() -> str: 

146 """Get this node's certificate tier.""" 

147 try: 

148 from security.key_delegation import get_node_tier 

149 return get_node_tier() 

150 except Exception: 

151 return 'local' 

152 

153 

154def _check_integrity() -> bool: 

155 """Check if HevolveAI installation passes integrity verification.""" 

156 try: 

157 from security.source_protection import SourceProtectionService 

158 result = SourceProtectionService.verify_hevolveai_integrity() 

159 return result.get('verified', False) 

160 except Exception: 

161 return False 

162 

163 

164def _has_cct_capability(capability: str) -> bool: 

165 """Check if this node has a valid CCT with the required capability. 

166 

167 Fail-closed in production. Dev-mode escape: HEVOLVE_DEV_MODE=true. 

168 """ 

169 try: 

170 from integrations.agent_engine.continual_learner_gate import ( 

171 ContinualLearnerGate, 

172 ) 

173 gate = ContinualLearnerGate() 

174 status = gate.check_access() 

175 if not status.get('has_valid_cct'): 

176 return False 

177 capabilities = status.get('capabilities', []) 

178 return capability in capabilities 

179 except Exception: 

180 # Fail-closed in production; dev mode bypasses for test environments 

181 if os.environ.get('HEVOLVE_DEV_MODE', '').lower() == 'true': 

182 return True 

183 return False