Coverage for core / platform / agent_environment.py: 99.0%

104 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Agent Environments — Logical scopes for AI workloads in HART OS. 

3 

4Like Android's Context but for agents. NOT containers — lightweight logical 

5boundaries with tool access control, model policy, budget limits, and 

6scoped event emission. 

7 

8Usage: 

9 from core.platform.agent_environment import EnvironmentManager 

10 

11 mgr = EnvironmentManager(service_registry=registry, event_emitter=bus.emit) 

12 env = mgr.create('research-task', model_policy='local_preferred', 

13 allowed_tools=['web_search', 'read_file'], 

14 max_cost_spark=50.0) 

15 

16 # Tool gating 

17 env.check_tool('web_search') # -> True 

18 env.check_tool('write_file') # -> False (not in allowed_tools) 

19 

20 # Scoped inference 

21 result = env.infer('Summarize this paper', model_type='llm') 

22 

23 # Scoped events 

24 env.emit('task.completed', {'result': 'done'}) 

25 # -> publishes 'env.research-task-abc.task.completed' 

26 

27 mgr.destroy(env.env_id) 

28""" 

29 

30import logging 

31import threading 

32import time 

33import uuid 

34from dataclasses import dataclass, field 

35from typing import Any, Callable, Dict, List, Optional 

36 

37logger = logging.getLogger('hevolve.platform') 

38 

39 

40# ─── Environment Configuration ──────────────────────────────────── 

41 

42@dataclass 

43class EnvironmentConfig: 

44 """Configuration for an agent environment. 

45 

46 Defines the boundaries, permissions, and resource limits. 

47 All fields are optional — unconfigured fields impose no constraints. 

48 """ 

49 working_dir: str = '' 

50 allowed_tools: List[str] = field(default_factory=list) 

51 denied_tools: List[str] = field(default_factory=list) 

52 model_policy: str = 'local_preferred' 

53 max_cost_spark: float = 0.0 # 0 = unlimited 

54 ai_capabilities: List[Dict[str, Any]] = field(default_factory=list) 

55 event_scope: str = '' # EventBus topic prefix 

56 timeout_seconds: float = 0.0 # 0 = no timeout 

57 metadata: Dict[str, Any] = field(default_factory=dict) 

58 

59 def to_dict(self) -> Dict[str, Any]: 

60 """Serialize for API responses.""" 

61 return { 

62 'working_dir': self.working_dir, 

63 'allowed_tools': self.allowed_tools, 

64 'denied_tools': self.denied_tools, 

65 'model_policy': self.model_policy, 

66 'max_cost_spark': self.max_cost_spark, 

67 'ai_capabilities': self.ai_capabilities, 

68 'event_scope': self.event_scope, 

69 'timeout_seconds': self.timeout_seconds, 

70 'metadata': self.metadata, 

71 } 

72 

73 @classmethod 

74 def from_dict(cls, data: Dict[str, Any]) -> 'EnvironmentConfig': 

75 """Deserialize from dict.""" 

76 return cls(**{k: v for k, v in data.items() 

77 if k in cls.__dataclass_fields__}) 

78 

79 

80# ─── Agent Environment ──────────────────────────────────────────── 

81 

82@dataclass 

83class AgentEnvironment: 

84 """A single agent execution environment. 

85 

86 Provides tool gating, scoped inference, and scoped event emission. 

87 Lightweight — just data + methods, no OS-level isolation. 

88 """ 

89 env_id: str 

90 name: str 

91 config: EnvironmentConfig 

92 created_at: float = field(default_factory=time.time) 

93 _active: bool = field(default=True, repr=False) 

94 _cost_spent: float = field(default=0.0, repr=False) 

95 

96 @property 

97 def active(self) -> bool: 

98 """Whether this environment is still active.""" 

99 return self._active 

100 

101 def check_tool(self, tool_name: str) -> bool: 

102 """Check if a tool is allowed in this environment. 

103 

104 Rules (same precedence as tool_allowlist.py): 

105 1. If denied_tools set and tool in it -> denied 

106 2. If allowed_tools set and tool NOT in it -> denied 

107 3. Otherwise -> allowed 

108 

109 Empty lists impose no constraints. 

110 """ 

111 if self.config.denied_tools and tool_name in self.config.denied_tools: 

112 return False 

113 if self.config.allowed_tools and tool_name not in self.config.allowed_tools: 

114 return False 

115 return True 

116 

117 def check_budget(self, cost: float) -> bool: 

118 """Check if spending `cost` would exceed the budget. 

119 

120 Returns True if within budget or no budget constraint. 

121 """ 

122 if self.config.max_cost_spark <= 0: 

123 return True 

124 return (self._cost_spent + cost) <= self.config.max_cost_spark 

125 

126 def record_cost(self, cost: float) -> None: 

127 """Record a cost expenditure.""" 

128 self._cost_spent += cost 

129 

130 def infer(self, prompt: str, model_type: str = 'llm', 

131 options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 

132 """Dispatch inference through ModelBusService with environment constraints. 

133 

134 Respects model_policy and budget. Returns the inference result dict 

135 or an error dict if unavailable. 

136 """ 

137 if not self._active: 

138 return {'error': 'environment is inactive', 'env_id': self.env_id} 

139 

140 try: 

141 from integrations.agent_engine.model_bus_service import ( 

142 get_model_bus_service, 

143 ) 

144 bus = get_model_bus_service() 

145 if bus is None: 

146 return {'error': 'model bus service not available'} 

147 

148 result = bus.infer( 

149 prompt=prompt, 

150 model_type=model_type, 

151 options={ 

152 **(options or {}), 

153 'policy': self.config.model_policy, 

154 }, 

155 ) 

156 return result if isinstance(result, dict) else {'result': result} 

157 except ImportError: 

158 return {'error': 'model bus service not installed'} 

159 except Exception as e: 

160 return {'error': str(e)} 

161 

162 def emit(self, topic: str, data: Optional[Dict[str, Any]] = None) -> None: 

163 """Emit a scoped event. 

164 

165 Prefixes the topic with the environment's event_scope. 

166 Falls back to env_id if no scope configured. 

167 """ 

168 scope = self.config.event_scope or f'env.{self.env_id}' 

169 scoped_topic = f'{scope}.{topic}' 

170 try: 

171 from core.platform.events import emit_event 

172 emit_event(scoped_topic, { 

173 **(data or {}), 

174 '_env_id': self.env_id, 

175 }) 

176 except Exception: 

177 pass 

178 

179 def deactivate(self) -> None: 

180 """Mark this environment as inactive.""" 

181 self._active = False 

182 

183 def to_dict(self) -> Dict[str, Any]: 

184 """Serialize for API responses.""" 

185 return { 

186 'env_id': self.env_id, 

187 'name': self.name, 

188 'config': self.config.to_dict(), 

189 'created_at': self.created_at, 

190 'active': self._active, 

191 'cost_spent': self._cost_spent, 

192 } 

193 

194 

195# ─── Environment Manager ───────────────────────────────────────── 

196 

197class EnvironmentManager: 

198 """Manages agent environment lifecycle. 

199 

200 Registered in ServiceRegistry as 'environments'. Provides CRUD 

201 operations for agent environments with thread-safe access. 

202 """ 

203 

204 def __init__(self, service_registry=None, event_emitter: Optional[Callable] = None): 

205 self._registry = service_registry 

206 self._emit = event_emitter 

207 self._environments: Dict[str, AgentEnvironment] = {} 

208 self._lock = threading.Lock() 

209 

210 def create(self, name: str, config: Optional[EnvironmentConfig] = None, 

211 **kwargs) -> AgentEnvironment: 

212 """Create a new agent environment. 

213 

214 Args: 

215 name: Human-readable name for this environment. 

216 config: Full EnvironmentConfig, or pass kwargs for shorthand. 

217 

218 Returns: 

219 The newly created AgentEnvironment. 

220 """ 

221 if config is None: 

222 config = EnvironmentConfig(**{k: v for k, v in kwargs.items() 

223 if k in EnvironmentConfig.__dataclass_fields__}) 

224 

225 env_id = f'{name.lower().replace(" ", "-")}-{uuid.uuid4().hex[:8]}' 

226 

227 # Default event scope from name 

228 if not config.event_scope: 

229 config.event_scope = f'env.{env_id}' 

230 

231 env = AgentEnvironment(env_id=env_id, name=name, config=config) 

232 

233 with self._lock: 

234 self._environments[env_id] = env 

235 

236 if self._emit: 

237 self._emit('environment.created', { 

238 'env_id': env_id, 

239 'name': name, 

240 'model_policy': config.model_policy, 

241 }) 

242 

243 logger.debug("Created environment: %s (%s)", name, env_id) 

244 return env 

245 

246 def get(self, env_id: str) -> Optional[AgentEnvironment]: 

247 """Get an environment by ID.""" 

248 return self._environments.get(env_id) 

249 

250 def destroy(self, env_id: str) -> bool: 

251 """Deactivate and remove an environment. 

252 

253 Does NOT delete working_dir or other external resources. 

254 

255 Returns: 

256 True if destroyed, False if not found. 

257 """ 

258 with self._lock: 

259 env = self._environments.pop(env_id, None) 

260 

261 if env is None: 

262 return False 

263 

264 env.deactivate() 

265 

266 if self._emit: 

267 self._emit('environment.destroyed', { 

268 'env_id': env_id, 

269 'name': env.name, 

270 }) 

271 

272 logger.debug("Destroyed environment: %s (%s)", env.name, env_id) 

273 return True 

274 

275 def list_environments(self) -> List[Dict[str, Any]]: 

276 """List all environments (both active and inactive in manager).""" 

277 return [env.to_dict() for env in self._environments.values()] 

278 

279 def count(self) -> int: 

280 """Return the number of managed environments.""" 

281 return len(self._environments) 

282 

283 # ── Lifecycle (for ServiceRegistry) ────────────────────────── 

284 

285 def health(self) -> dict: 

286 """Health report.""" 

287 active = sum(1 for e in self._environments.values() if e.active) 

288 return { 

289 'status': 'ok', 

290 'total_environments': len(self._environments), 

291 'active': active, 

292 }