Coverage for core / platform / agent_environment.py: 99.0%
104 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Agent Environments — Logical scopes for AI workloads in HART OS.
4Like Android's Context but for agents. NOT containers — lightweight logical
5boundaries with tool access control, model policy, budget limits, and
6scoped event emission.
8Usage:
9 from core.platform.agent_environment import EnvironmentManager
11 mgr = EnvironmentManager(service_registry=registry, event_emitter=bus.emit)
12 env = mgr.create('research-task', model_policy='local_preferred',
13 allowed_tools=['web_search', 'read_file'],
14 max_cost_spark=50.0)
16 # Tool gating
17 env.check_tool('web_search') # -> True
18 env.check_tool('write_file') # -> False (not in allowed_tools)
20 # Scoped inference
21 result = env.infer('Summarize this paper', model_type='llm')
23 # Scoped events
24 env.emit('task.completed', {'result': 'done'})
25 # -> publishes 'env.research-task-abc.task.completed'
27 mgr.destroy(env.env_id)
28"""
30import logging
31import threading
32import time
33import uuid
34from dataclasses import dataclass, field
35from typing import Any, Callable, Dict, List, Optional
37logger = logging.getLogger('hevolve.platform')
40# ─── Environment Configuration ────────────────────────────────────
42@dataclass
43class EnvironmentConfig:
44 """Configuration for an agent environment.
46 Defines the boundaries, permissions, and resource limits.
47 All fields are optional — unconfigured fields impose no constraints.
48 """
49 working_dir: str = ''
50 allowed_tools: List[str] = field(default_factory=list)
51 denied_tools: List[str] = field(default_factory=list)
52 model_policy: str = 'local_preferred'
53 max_cost_spark: float = 0.0 # 0 = unlimited
54 ai_capabilities: List[Dict[str, Any]] = field(default_factory=list)
55 event_scope: str = '' # EventBus topic prefix
56 timeout_seconds: float = 0.0 # 0 = no timeout
57 metadata: Dict[str, Any] = field(default_factory=dict)
59 def to_dict(self) -> Dict[str, Any]:
60 """Serialize for API responses."""
61 return {
62 'working_dir': self.working_dir,
63 'allowed_tools': self.allowed_tools,
64 'denied_tools': self.denied_tools,
65 'model_policy': self.model_policy,
66 'max_cost_spark': self.max_cost_spark,
67 'ai_capabilities': self.ai_capabilities,
68 'event_scope': self.event_scope,
69 'timeout_seconds': self.timeout_seconds,
70 'metadata': self.metadata,
71 }
73 @classmethod
74 def from_dict(cls, data: Dict[str, Any]) -> 'EnvironmentConfig':
75 """Deserialize from dict."""
76 return cls(**{k: v for k, v in data.items()
77 if k in cls.__dataclass_fields__})
80# ─── Agent Environment ────────────────────────────────────────────
82@dataclass
83class AgentEnvironment:
84 """A single agent execution environment.
86 Provides tool gating, scoped inference, and scoped event emission.
87 Lightweight — just data + methods, no OS-level isolation.
88 """
89 env_id: str
90 name: str
91 config: EnvironmentConfig
92 created_at: float = field(default_factory=time.time)
93 _active: bool = field(default=True, repr=False)
94 _cost_spent: float = field(default=0.0, repr=False)
96 @property
97 def active(self) -> bool:
98 """Whether this environment is still active."""
99 return self._active
101 def check_tool(self, tool_name: str) -> bool:
102 """Check if a tool is allowed in this environment.
104 Rules (same precedence as tool_allowlist.py):
105 1. If denied_tools set and tool in it -> denied
106 2. If allowed_tools set and tool NOT in it -> denied
107 3. Otherwise -> allowed
109 Empty lists impose no constraints.
110 """
111 if self.config.denied_tools and tool_name in self.config.denied_tools:
112 return False
113 if self.config.allowed_tools and tool_name not in self.config.allowed_tools:
114 return False
115 return True
117 def check_budget(self, cost: float) -> bool:
118 """Check if spending `cost` would exceed the budget.
120 Returns True if within budget or no budget constraint.
121 """
122 if self.config.max_cost_spark <= 0:
123 return True
124 return (self._cost_spent + cost) <= self.config.max_cost_spark
126 def record_cost(self, cost: float) -> None:
127 """Record a cost expenditure."""
128 self._cost_spent += cost
130 def infer(self, prompt: str, model_type: str = 'llm',
131 options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
132 """Dispatch inference through ModelBusService with environment constraints.
134 Respects model_policy and budget. Returns the inference result dict
135 or an error dict if unavailable.
136 """
137 if not self._active:
138 return {'error': 'environment is inactive', 'env_id': self.env_id}
140 try:
141 from integrations.agent_engine.model_bus_service import (
142 get_model_bus_service,
143 )
144 bus = get_model_bus_service()
145 if bus is None:
146 return {'error': 'model bus service not available'}
148 result = bus.infer(
149 prompt=prompt,
150 model_type=model_type,
151 options={
152 **(options or {}),
153 'policy': self.config.model_policy,
154 },
155 )
156 return result if isinstance(result, dict) else {'result': result}
157 except ImportError:
158 return {'error': 'model bus service not installed'}
159 except Exception as e:
160 return {'error': str(e)}
162 def emit(self, topic: str, data: Optional[Dict[str, Any]] = None) -> None:
163 """Emit a scoped event.
165 Prefixes the topic with the environment's event_scope.
166 Falls back to env_id if no scope configured.
167 """
168 scope = self.config.event_scope or f'env.{self.env_id}'
169 scoped_topic = f'{scope}.{topic}'
170 try:
171 from core.platform.events import emit_event
172 emit_event(scoped_topic, {
173 **(data or {}),
174 '_env_id': self.env_id,
175 })
176 except Exception:
177 pass
179 def deactivate(self) -> None:
180 """Mark this environment as inactive."""
181 self._active = False
183 def to_dict(self) -> Dict[str, Any]:
184 """Serialize for API responses."""
185 return {
186 'env_id': self.env_id,
187 'name': self.name,
188 'config': self.config.to_dict(),
189 'created_at': self.created_at,
190 'active': self._active,
191 'cost_spent': self._cost_spent,
192 }
195# ─── Environment Manager ─────────────────────────────────────────
197class EnvironmentManager:
198 """Manages agent environment lifecycle.
200 Registered in ServiceRegistry as 'environments'. Provides CRUD
201 operations for agent environments with thread-safe access.
202 """
204 def __init__(self, service_registry=None, event_emitter: Optional[Callable] = None):
205 self._registry = service_registry
206 self._emit = event_emitter
207 self._environments: Dict[str, AgentEnvironment] = {}
208 self._lock = threading.Lock()
210 def create(self, name: str, config: Optional[EnvironmentConfig] = None,
211 **kwargs) -> AgentEnvironment:
212 """Create a new agent environment.
214 Args:
215 name: Human-readable name for this environment.
216 config: Full EnvironmentConfig, or pass kwargs for shorthand.
218 Returns:
219 The newly created AgentEnvironment.
220 """
221 if config is None:
222 config = EnvironmentConfig(**{k: v for k, v in kwargs.items()
223 if k in EnvironmentConfig.__dataclass_fields__})
225 env_id = f'{name.lower().replace(" ", "-")}-{uuid.uuid4().hex[:8]}'
227 # Default event scope from name
228 if not config.event_scope:
229 config.event_scope = f'env.{env_id}'
231 env = AgentEnvironment(env_id=env_id, name=name, config=config)
233 with self._lock:
234 self._environments[env_id] = env
236 if self._emit:
237 self._emit('environment.created', {
238 'env_id': env_id,
239 'name': name,
240 'model_policy': config.model_policy,
241 })
243 logger.debug("Created environment: %s (%s)", name, env_id)
244 return env
246 def get(self, env_id: str) -> Optional[AgentEnvironment]:
247 """Get an environment by ID."""
248 return self._environments.get(env_id)
250 def destroy(self, env_id: str) -> bool:
251 """Deactivate and remove an environment.
253 Does NOT delete working_dir or other external resources.
255 Returns:
256 True if destroyed, False if not found.
257 """
258 with self._lock:
259 env = self._environments.pop(env_id, None)
261 if env is None:
262 return False
264 env.deactivate()
266 if self._emit:
267 self._emit('environment.destroyed', {
268 'env_id': env_id,
269 'name': env.name,
270 })
272 logger.debug("Destroyed environment: %s (%s)", env.name, env_id)
273 return True
275 def list_environments(self) -> List[Dict[str, Any]]:
276 """List all environments (both active and inactive in manager)."""
277 return [env.to_dict() for env in self._environments.values()]
279 def count(self) -> int:
280 """Return the number of managed environments."""
281 return len(self._environments)
283 # ── Lifecycle (for ServiceRegistry) ──────────────────────────
285 def health(self) -> dict:
286 """Health report."""
287 active = sum(1 for e in self._environments.values() if e.active)
288 return {
289 'status': 'ok',
290 'total_environments': len(self._environments),
291 'active': active,
292 }