Coverage for integrations / agent_engine / agent_attribution.py: 89.2%

212 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Agent Attribution — Dense feedback with long-horizon credit assignment. 

3 

4THE SINGLE CONVERGING PATH for agent action attribution. 

5 

6All agents (daemon, benchmark_prover, compute_optimizer, hive_session, 

7task_protocol) call THIS module to record their actions. This module 

8routes ALL attribution to the existing WorldModelBridge — no parallel 

9systems, no new storage, no duplicate APIs. 

10 

11How it works: 

12 1. Agent calls begin_action(goal_id, agent_id, action_type, expected_outcome) 

13 → returns action_id (correlation ID) 

14 2. Agent calls record_step(action_id, step_description, state, decision) 

15 at each intermediate state (every tick, every sub-decision) 

16 3. Agent calls complete_action(action_id, outcome) 

17 → computes attribution chain, submits to WorldModelBridge, 

18 compares vs expected_outcome for credit assignment 

19 

20The WorldModelBridge.record_interaction() already accepts goal_id and 

21queues experiences for HevolveAI learning. We just wire the attribution 

22chain in the 'prompt' field as structured JSON. 

23 

24For long-horizon goals (hours/days), the action stays open with step 

25history in memory (bounded by ACTION_TTL_SECONDS). On complete_action, 

26the full chain is submitted in a single experience with credit weights. 

27 

28NO PARALLEL PATHS: 

29 - Storage: WorldModelBridge._experience_queue (existing) 

30 - Events: core.platform.events.emit_event (existing) 

31 - Privacy: security.secret_redactor (existing, applied by bridge) 

32 - Guardrails: security.hive_guardrails (existing, applied by bridge) 

33""" 

34 

35import json 

36import logging 

37import threading 

38import time 

39import uuid 

40from dataclasses import dataclass, field, asdict 

41from typing import Any, Dict, List, Optional 

42 

43logger = logging.getLogger('hevolve.agent_attribution') 

44 

45# ─── Config ───────────────────────────────────────────────────────────── 

46 

47# How long an action stays open before auto-completion with outcome='timeout'. 

48# Set long enough for benchmark runs (hours) but bounded. 

49ACTION_TTL_SECONDS = 6 * 3600 # 6 hours 

50 

51# Max open actions before oldest is force-completed (prevents unbounded growth). 

52MAX_OPEN_ACTIONS = 500 

53 

54# Max steps per action (prevents DoS from tick storms). 

55MAX_STEPS_PER_ACTION = 1000 

56 

57 

58# ─── Dataclasses ──────────────────────────────────────────────────────── 

59 

60@dataclass 

61class ActionStep: 

62 """A single intermediate state in an action chain.""" 

63 timestamp: float 

64 description: str 

65 state: Dict[str, Any] = field(default_factory=dict) 

66 decision: str = '' # What the agent decided at this step 

67 confidence: float = 0.5 # Agent's confidence in the decision 

68 

69 def to_dict(self) -> Dict: 

70 return asdict(self) 

71 

72 

73@dataclass 

74class Observation: 

75 """An external observation that grounds the action in reality. 

76 

77 Emitted by sensors / world-state probes / user feedback (NOT by the 

78 agent itself). Stored alongside steps so WMB can distinguish 

79 agent-stated-state (ActionStep.state) from observed-state (this). 

80 """ 

81 timestamp: float 

82 observation_type: str # 'sensor', 'world_state', 'user_feedback', ... 

83 data: Dict[str, Any] = field(default_factory=dict) 

84 source: str = '' # who reported it 

85 confidence: float = 1.0 # observation reliability 

86 

87 def to_dict(self) -> Dict: 

88 return asdict(self) 

89 

90 

91@dataclass 

92class AgentAction: 

93 """A long-horizon action with intermediate steps and expected outcome.""" 

94 action_id: str 

95 goal_id: Optional[str] 

96 agent_id: str 

97 action_type: str # e.g., 'benchmark_run', 'task_dispatch', 'optimization_cycle' 

98 started_at: float 

99 # parent_action_id enables causal chains across the hive: a 

100 # benchmark_run action begun inside a bootstrap_upgrade goal carries 

101 # parent=<goal's action id>, so WMB can trace multi-hop attribution 

102 # without reconstructing the graph from timestamps. 

103 parent_action_id: Optional[str] = None 

104 expected_outcome: Dict[str, Any] = field(default_factory=dict) 

105 acceptance_criteria: List[str] = field(default_factory=list) 

106 steps: List[ActionStep] = field(default_factory=list) 

107 observations: List[Observation] = field(default_factory=list) 

108 completed: bool = False 

109 outcome: Optional[Dict[str, Any]] = None 

110 completed_at: Optional[float] = None 

111 

112 def to_dict(self) -> Dict: 

113 return { 

114 **asdict(self), 

115 'steps': [s.to_dict() for s in self.steps], 

116 'observations': [o.to_dict() for o in self.observations], 

117 } 

118 

119 

120# ─── Orchestrator ─────────────────────────────────────────────────────── 

121 

122class AgentAttributionOrchestrator: 

123 """Single source of truth for agent action attribution. 

124 

125 Agents call begin/record/complete — this class routes everything 

126 through the existing WorldModelBridge. No parallel storage. 

127 """ 

128 

129 def __init__(self): 

130 self._lock = threading.RLock() 

131 self._open_actions: Dict[str, AgentAction] = {} 

132 self._stats = { 

133 'total_begun': 0, 

134 'total_completed': 0, 

135 'total_timed_out': 0, 

136 'total_steps_recorded': 0, 

137 } 

138 

139 # ─── Public API ───────────────────────────────────────────────── 

140 

141 def begin_action(self, agent_id: str, action_type: str, 

142 goal_id: Optional[str] = None, 

143 expected_outcome: Optional[Dict] = None, 

144 acceptance_criteria: Optional[List[str]] = None, 

145 parent_action_id: Optional[str] = None) -> str: 

146 """Start tracking a new action. Returns correlation action_id. 

147 

148 Args: 

149 agent_id: Which agent is acting (e.g., 'agent_daemon', 'benchmark_prover') 

150 action_type: Category (e.g., 'benchmark_run', 'optimization_cycle') 

151 goal_id: Optional goal this action serves (for attribution) 

152 expected_outcome: Dict of {metric: expected_value} for comparison 

153 acceptance_criteria: List of string criteria for success 

154 parent_action_id: When this action is spawned by another 

155 action (sub-task, retry, cascade), pass the parent's 

156 action_id so WMB can reconstruct the causal tree. 

157 

158 Returns: 

159 action_id (UUID) — pass this to record_step and complete_action. 

160 """ 

161 action_id = str(uuid.uuid4()) 

162 action = AgentAction( 

163 action_id=action_id, 

164 goal_id=goal_id, 

165 agent_id=agent_id, 

166 action_type=action_type, 

167 started_at=time.time(), 

168 parent_action_id=parent_action_id, 

169 expected_outcome=expected_outcome or {}, 

170 acceptance_criteria=acceptance_criteria or [], 

171 ) 

172 

173 evicted_action = None 

174 with self._lock: 

175 # Evict oldest if at cap (force-complete with timeout outcome) 

176 if len(self._open_actions) >= MAX_OPEN_ACTIONS: 

177 evicted_action = self._force_timeout_oldest() 

178 

179 self._open_actions[action_id] = action 

180 self._stats['total_begun'] += 1 

181 

182 # Submit evicted action to WMB OUTSIDE the lock (nested locks risk) 

183 if evicted_action is not None: 

184 try: 

185 self._submit_to_world_model(evicted_action) 

186 self._emit_completion_event(evicted_action) 

187 except Exception as exc: 

188 logger.debug("Evicted action submit failed: %s", exc) 

189 

190 logger.debug( 

191 "Attribution: begin_action %s agent=%s type=%s goal=%s", 

192 action_id[:8], agent_id, action_type, goal_id, 

193 ) 

194 return action_id 

195 

196 def record_step(self, action_id: str, description: str, 

197 state: Optional[Dict] = None, 

198 decision: str = '', 

199 confidence: float = 0.5) -> bool: 

200 """Record an intermediate state in the action chain. 

201 

202 Called at every tick, every sub-decision, every state change. 

203 Dense recording = better attribution on completion. 

204 

205 Returns: 

206 True if recorded, False if action unknown/completed/full. 

207 """ 

208 with self._lock: 

209 action = self._open_actions.get(action_id) 

210 if action is None or action.completed: 

211 return False 

212 if len(action.steps) >= MAX_STEPS_PER_ACTION: 

213 return False 

214 

215 step = ActionStep( 

216 timestamp=time.time(), 

217 description=description[:500], 

218 state=self._truncate_dict(state or {}), 

219 decision=decision[:200], 

220 confidence=max(0.0, min(1.0, confidence)), 

221 ) 

222 action.steps.append(step) 

223 self._stats['total_steps_recorded'] += 1 

224 

225 return True 

226 

227 def record_observation(self, action_id: str, observation_type: str, 

228 data: Optional[Dict] = None, 

229 source: str = '', 

230 confidence: float = 1.0) -> bool: 

231 """Attach an EXTERNAL observation to an open action. 

232 

233 Distinguished from record_step() — steps are what the AGENT 

234 decided/saw/computed; observations are what the WORLD reports 

235 back (sensor reading, user correction, world-state probe). 

236 Stored in a separate list so WMB can contrast intent vs 

237 grounded reality when scoring credit. 

238 

239 Args: 

240 action_id: correlation id from begin_action 

241 observation_type: 'sensor' | 'world_state' | 

242 'user_feedback' | 'external' | ... 

243 data: payload (truncated to 1000 chars in str form) 

244 source: which subsystem reported it 

245 confidence: [0.0, 1.0] 

246 

247 Returns: 

248 True if attached, False if action unknown/completed/full. 

249 """ 

250 with self._lock: 

251 action = self._open_actions.get(action_id) 

252 if action is None or action.completed: 

253 return False 

254 # Reuse MAX_STEPS_PER_ACTION as a combined ceiling — both 

255 # steps and observations count against the same DoS bound. 

256 if (len(action.observations) + len(action.steps) 

257 >= MAX_STEPS_PER_ACTION): 

258 return False 

259 

260 obs = Observation( 

261 timestamp=time.time(), 

262 observation_type=observation_type[:100], 

263 data=self._truncate_dict(data or {}), 

264 source=source[:100], 

265 confidence=max(0.0, min(1.0, confidence)), 

266 ) 

267 action.observations.append(obs) 

268 return True 

269 

270 def complete_action(self, action_id: str, 

271 outcome: Optional[Dict] = None) -> bool: 

272 """Complete an action. Submits full chain to WorldModelBridge. 

273 

274 This is the long-horizon attribution moment: 

275 1. Compute credit assignment across steps (later steps closer to outcome 

276 get higher weight — exponential decay) 

277 2. Compare outcome vs expected_outcome for success/failure signal 

278 3. Submit chain + attribution + credit to WorldModelBridge for learning 

279 

280 Returns: 

281 True if completed, False if action unknown. 

282 """ 

283 with self._lock: 

284 action = self._open_actions.pop(action_id, None) 

285 if action is None: 

286 return False 

287 

288 action.completed = True 

289 action.outcome = outcome or {} 

290 action.completed_at = time.time() 

291 self._stats['total_completed'] += 1 

292 

293 self._submit_to_world_model(action) 

294 self._emit_completion_event(action) 

295 return True 

296 

297 def get_action(self, action_id: str) -> Optional[AgentAction]: 

298 """Query a live or recently-completed action.""" 

299 with self._lock: 

300 return self._open_actions.get(action_id) 

301 

302 def get_stats(self) -> Dict: 

303 """Stats for observability/health checks.""" 

304 with self._lock: 

305 return { 

306 **self._stats, 

307 'open_count': len(self._open_actions), 

308 } 

309 

310 def cleanup_expired(self) -> int: 

311 """Force-complete actions older than TTL. Returns count timed out. 

312 

313 Called periodically by the agent daemon tick. 

314 complete_action handles submit + event emission + total_completed. 

315 We additionally tag these as timed_out for stats distinction. 

316 """ 

317 now = time.time() 

318 expired_ids = [] 

319 with self._lock: 

320 for aid, action in self._open_actions.items(): 

321 if now - action.started_at > ACTION_TTL_SECONDS: 

322 expired_ids.append(aid) 

323 

324 timed_out = 0 

325 for aid in expired_ids: 

326 if self.complete_action(aid, outcome={'status': 'timeout'}): 

327 timed_out += 1 

328 

329 if timed_out: 

330 with self._lock: 

331 self._stats['total_timed_out'] += timed_out 

332 

333 return timed_out 

334 

335 # ─── Internal: attribution + WorldModelBridge routing ────────── 

336 

337 def _submit_to_world_model(self, action: AgentAction) -> None: 

338 """Submit the completed action chain to WorldModelBridge. 

339 

340 Uses the EXISTING record_interaction API — goal_id field is 

341 first-class. Attribution chain goes in the prompt field as 

342 structured JSON so HevolveAI can parse credit assignments. 

343 """ 

344 try: 

345 from integrations.agent_engine.world_model_bridge import ( 

346 get_world_model_bridge, 

347 ) 

348 bridge = get_world_model_bridge() 

349 except ImportError: 

350 logger.debug("WorldModelBridge unavailable, skipping attribution submit") 

351 return 

352 except Exception as exc: 

353 logger.debug("WorldModelBridge init failed: %s", exc) 

354 return 

355 

356 # Compute credit assignment: exponential decay from outcome backward. 

357 # Later steps (closer to outcome) get higher weight. 

358 credits = self._compute_credit_assignment(action) 

359 

360 # Compare outcome vs expected for success signal 

361 success_score = self._compute_success_score(action) 

362 

363 # Build structured experience — uses existing record_interaction schema 

364 chain_summary = { 

365 'action_id': action.action_id, 

366 'parent_action_id': action.parent_action_id, 

367 'agent_id': action.agent_id, 

368 'action_type': action.action_type, 

369 'goal_id': action.goal_id, 

370 'expected_outcome': action.expected_outcome, 

371 'acceptance_criteria': action.acceptance_criteria, 

372 'duration_seconds': round( 

373 (action.completed_at or time.time()) - action.started_at, 2), 

374 'step_count': len(action.steps), 

375 'observation_count': len(action.observations), 

376 'outcome': action.outcome, 

377 'success_score': success_score, 

378 'step_credits': credits, 

379 'steps_summary': [ 

380 { 

381 'desc': s.description, 

382 'decision': s.decision, 

383 'confidence': s.confidence, 

384 'credit': credits.get(i, 0.0), 

385 } 

386 for i, s in enumerate(action.steps[-50:]) # last 50 steps 

387 ], 

388 # External observations separated from agent-stated steps — 

389 # WMB uses these as the grounding signal vs the agent's 

390 # self-reported state. 

391 'observations': [o.to_dict() for o in action.observations[-50:]], 

392 } 

393 

394 # Compact human-readable summary for the 'prompt' field — the 

395 # structured chain goes in attribution_chain below. This keeps 

396 # HevolveAI's text-distillation path happy (it now sees a 

397 # narrative sentence, not a JSON blob). 

398 summary_text = ( 

399 f"{action.agent_id} ran {action.action_type} " 

400 f"(goal={action.goal_id}, parent={action.parent_action_id}, " 

401 f"steps={len(action.steps)}, observations={len(action.observations)}, " 

402 f"success={round(success_score, 2)})" 

403 ) 

404 

405 try: 

406 bridge.record_interaction( 

407 user_id=action.agent_id, 

408 prompt_id=action.action_id, 

409 prompt=summary_text[:2000], 

410 response=json.dumps(action.outcome or {}, default=str)[:5000], 

411 model_id=f'{action.agent_id}:{action.action_type}', 

412 latency_ms=(action.completed_at - action.started_at) * 1000 if action.completed_at else 0, 

413 goal_id=action.goal_id, 

414 attribution_chain=chain_summary, 

415 ) 

416 except TypeError: 

417 # Older WMB without attribution_chain kwarg — fall back to 

418 # packing JSON in prompt as before. 

419 try: 

420 bridge.record_interaction( 

421 user_id=action.agent_id, 

422 prompt_id=action.action_id, 

423 prompt=json.dumps(chain_summary, default=str)[:2000], 

424 response=json.dumps(action.outcome or {}, default=str)[:5000], 

425 model_id=f'{action.agent_id}:{action.action_type}', 

426 latency_ms=(action.completed_at - action.started_at) * 1000 if action.completed_at else 0, 

427 goal_id=action.goal_id, 

428 ) 

429 except Exception as exc: 

430 logger.debug("record_interaction fallback failed: %s", exc) 

431 except Exception as exc: 

432 logger.debug("record_interaction failed: %s", exc) 

433 

434 def _compute_credit_assignment(self, action: AgentAction) -> Dict[int, float]: 

435 """Assign credit to each step for the final outcome. 

436 

437 Combines two signals — temporal decay is a useful prior, but 

438 pure temporal weighting gives late-but-low-confidence filler 

439 steps equal credit with late-and-decisive ones. Multiplying 

440 by per-step confidence softens that, letting agents signal 

441 'I wasn't sure here' vs 'this was the decisive moment'. 

442 

443 Formula per step i of n steps: 

444 weight_raw(i) = 0.9 ^ (n - 1 - i) # temporal decay 

445 * (0.5 + 0.5 * confidence) # confidence mix 

446 credit(i) = weight_raw(i) / sum(weight_raw) 

447 

448 - Pure unconfident step (0.0) still gets 0.5x the temporal 

449 weight (not zero — we don't want to reward agents who 

450 under-report confidence to game the metric). 

451 - Pure confident step (1.0) gets full temporal weight. 

452 - Sum still normalizes to 1.0. 

453 

454 Returns: 

455 Dict mapping step_index → credit weight (sum ≈ 1.0). 

456 """ 

457 n = len(action.steps) 

458 if n == 0: 

459 return {} 

460 

461 decay = 0.9 

462 raw_weights = {} 

463 for i, step in enumerate(action.steps): 

464 temporal = decay ** (n - 1 - i) 

465 confidence_boost = 0.5 + 0.5 * max(0.0, min(1.0, step.confidence)) 

466 raw_weights[i] = temporal * confidence_boost 

467 

468 total = sum(raw_weights.values()) 

469 if total <= 0: 

470 return {i: 1.0 / n for i in range(n)} 

471 return {i: round(w / total, 4) for i, w in raw_weights.items()} 

472 

473 def _compute_success_score(self, action: AgentAction) -> float: 

474 """Compare outcome vs expected_outcome. Returns [0.0, 1.0]. 

475 

476 - If expected_outcome is empty: return 0.5 (neutral) 

477 - If outcome has 'error' / 'status' == 'error': return 0.0 

478 - Otherwise: fraction of expected keys matched within tolerance. 

479 """ 

480 outcome = action.outcome or {} 

481 if not action.expected_outcome: 

482 # No expectation set — neutral score, use outcome status 

483 if outcome.get('status') == 'error' or outcome.get('error'): 

484 return 0.0 

485 if outcome.get('status') == 'timeout': 

486 return 0.2 

487 return 0.5 

488 

489 if outcome.get('status') == 'error' or outcome.get('error'): 

490 return 0.0 

491 

492 # Check expected keys 

493 matched = 0 

494 total = 0 

495 for key, expected in action.expected_outcome.items(): 

496 total += 1 

497 actual = outcome.get(key) 

498 if actual is None: 

499 continue 

500 if isinstance(expected, (int, float)) and isinstance(actual, (int, float)): 

501 # Numeric: within 10% tolerance (relative) OR 0.05 absolute for small values. 

502 # The absolute band catches cases like expected=0 + actual=0.03 (delta 

503 # metrics), where pure relative tolerance would fail every near-zero case. 

504 if abs(actual - expected) <= 0.05: 

505 matched += 1 

506 elif expected != 0 and abs(actual - expected) / abs(expected) <= 0.1: 

507 matched += 1 

508 elif actual == expected: 

509 matched += 1 

510 

511 return matched / total if total > 0 else 0.5 

512 

513 def _emit_completion_event(self, action: AgentAction) -> None: 

514 """Emit EventBus event for real-time dashboards.""" 

515 try: 

516 from core.platform.events import emit_event 

517 emit_event('agent.action.completed', { 

518 'action_id': action.action_id, 

519 'agent_id': action.agent_id, 

520 'action_type': action.action_type, 

521 'goal_id': action.goal_id, 

522 'duration_seconds': round( 

523 (action.completed_at or time.time()) - action.started_at, 2), 

524 'step_count': len(action.steps), 

525 'success_score': self._compute_success_score(action), 

526 }) 

527 except Exception: 

528 pass # Best effort — never crash on observability 

529 

530 def _force_timeout_oldest(self) -> Optional[AgentAction]: 

531 """Force-complete the oldest open action when at cap. 

532 

533 Caller MUST hold _lock. Returns the evicted action so the caller 

534 can submit it to WorldModelBridge OUTSIDE the lock (avoids nested 

535 lock acquisition on WMB's internal lock). 

536 """ 

537 if not self._open_actions: 

538 return None 

539 oldest_id = min( 

540 self._open_actions.keys(), 

541 key=lambda k: self._open_actions[k].started_at, 

542 ) 

543 oldest = self._open_actions.pop(oldest_id) 

544 oldest.completed = True 

545 oldest.outcome = {'status': 'evicted', 'reason': 'max_open_actions'} 

546 oldest.completed_at = time.time() 

547 self._stats['total_timed_out'] += 1 

548 return oldest 

549 

550 @staticmethod 

551 def _truncate_dict(d: Dict) -> Dict: 

552 """Truncate string values in dict to prevent unbounded state growth.""" 

553 out = {} 

554 for k, v in d.items(): 

555 if isinstance(v, str) and len(v) > 500: 

556 out[k] = v[:500] + '...' 

557 elif isinstance(v, (dict, list)) and len(str(v)) > 1000: 

558 out[k] = str(v)[:1000] + '...' 

559 else: 

560 out[k] = v 

561 return out 

562 

563 

564# ─── Singleton ────────────────────────────────────────────────────────── 

565 

566_orchestrator: Optional[AgentAttributionOrchestrator] = None 

567_orchestrator_lock = threading.Lock() 

568 

569 

570def get_attribution() -> AgentAttributionOrchestrator: 

571 """Get the singleton orchestrator. Thread-safe.""" 

572 global _orchestrator 

573 if _orchestrator is None: 

574 with _orchestrator_lock: 

575 if _orchestrator is None: 

576 _orchestrator = AgentAttributionOrchestrator() 

577 return _orchestrator 

578 

579 

580# ─── Convenience functions (thin wrappers for call-site brevity) ─────── 

581 

582def begin_action(agent_id: str, action_type: str, 

583 goal_id: Optional[str] = None, 

584 expected_outcome: Optional[Dict] = None, 

585 acceptance_criteria: Optional[List[str]] = None, 

586 parent_action_id: Optional[str] = None) -> str: 

587 """Convenience: start tracking. See AgentAttributionOrchestrator.begin_action.""" 

588 return get_attribution().begin_action( 

589 agent_id, action_type, goal_id, expected_outcome, 

590 acceptance_criteria, parent_action_id, 

591 ) 

592 

593 

594def record_observation(action_id: str, observation_type: str, 

595 data: Optional[Dict] = None, 

596 source: str = '', 

597 confidence: float = 1.0) -> bool: 

598 """Convenience: attach external-world observation to an open action.""" 

599 return get_attribution().record_observation( 

600 action_id, observation_type, data, source, confidence, 

601 ) 

602 

603 

604def record_step(action_id: str, description: str, 

605 state: Optional[Dict] = None, 

606 decision: str = '', 

607 confidence: float = 0.5) -> bool: 

608 """Convenience: record intermediate state.""" 

609 return get_attribution().record_step( 

610 action_id, description, state, decision, confidence, 

611 ) 

612 

613 

614def complete_action(action_id: str, outcome: Optional[Dict] = None) -> bool: 

615 """Convenience: complete action and submit attribution.""" 

616 return get_attribution().complete_action(action_id, outcome)