Coverage for integrations / agent_engine / agent_attribution.py: 89.2%
212 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Agent Attribution — Dense feedback with long-horizon credit assignment.
4THE SINGLE CONVERGING PATH for agent action attribution.
6All agents (daemon, benchmark_prover, compute_optimizer, hive_session,
7task_protocol) call THIS module to record their actions. This module
8routes ALL attribution to the existing WorldModelBridge — no parallel
9systems, no new storage, no duplicate APIs.
11How it works:
12 1. Agent calls begin_action(goal_id, agent_id, action_type, expected_outcome)
13 → returns action_id (correlation ID)
14 2. Agent calls record_step(action_id, step_description, state, decision)
15 at each intermediate state (every tick, every sub-decision)
16 3. Agent calls complete_action(action_id, outcome)
17 → computes attribution chain, submits to WorldModelBridge,
18 compares vs expected_outcome for credit assignment
20The WorldModelBridge.record_interaction() already accepts goal_id and
21queues experiences for HevolveAI learning. We just wire the attribution
22chain in the 'prompt' field as structured JSON.
24For long-horizon goals (hours/days), the action stays open with step
25history in memory (bounded by ACTION_TTL_SECONDS). On complete_action,
26the full chain is submitted in a single experience with credit weights.
28NO PARALLEL PATHS:
29 - Storage: WorldModelBridge._experience_queue (existing)
30 - Events: core.platform.events.emit_event (existing)
31 - Privacy: security.secret_redactor (existing, applied by bridge)
32 - Guardrails: security.hive_guardrails (existing, applied by bridge)
33"""
35import json
36import logging
37import threading
38import time
39import uuid
40from dataclasses import dataclass, field, asdict
41from typing import Any, Dict, List, Optional
43logger = logging.getLogger('hevolve.agent_attribution')
45# ─── Config ─────────────────────────────────────────────────────────────
47# How long an action stays open before auto-completion with outcome='timeout'.
48# Set long enough for benchmark runs (hours) but bounded.
49ACTION_TTL_SECONDS = 6 * 3600 # 6 hours
51# Max open actions before oldest is force-completed (prevents unbounded growth).
52MAX_OPEN_ACTIONS = 500
54# Max steps per action (prevents DoS from tick storms).
55MAX_STEPS_PER_ACTION = 1000
58# ─── Dataclasses ────────────────────────────────────────────────────────
60@dataclass
61class ActionStep:
62 """A single intermediate state in an action chain."""
63 timestamp: float
64 description: str
65 state: Dict[str, Any] = field(default_factory=dict)
66 decision: str = '' # What the agent decided at this step
67 confidence: float = 0.5 # Agent's confidence in the decision
69 def to_dict(self) -> Dict:
70 return asdict(self)
73@dataclass
74class Observation:
75 """An external observation that grounds the action in reality.
77 Emitted by sensors / world-state probes / user feedback (NOT by the
78 agent itself). Stored alongside steps so WMB can distinguish
79 agent-stated-state (ActionStep.state) from observed-state (this).
80 """
81 timestamp: float
82 observation_type: str # 'sensor', 'world_state', 'user_feedback', ...
83 data: Dict[str, Any] = field(default_factory=dict)
84 source: str = '' # who reported it
85 confidence: float = 1.0 # observation reliability
87 def to_dict(self) -> Dict:
88 return asdict(self)
91@dataclass
92class AgentAction:
93 """A long-horizon action with intermediate steps and expected outcome."""
94 action_id: str
95 goal_id: Optional[str]
96 agent_id: str
97 action_type: str # e.g., 'benchmark_run', 'task_dispatch', 'optimization_cycle'
98 started_at: float
99 # parent_action_id enables causal chains across the hive: a
100 # benchmark_run action begun inside a bootstrap_upgrade goal carries
101 # parent=<goal's action id>, so WMB can trace multi-hop attribution
102 # without reconstructing the graph from timestamps.
103 parent_action_id: Optional[str] = None
104 expected_outcome: Dict[str, Any] = field(default_factory=dict)
105 acceptance_criteria: List[str] = field(default_factory=list)
106 steps: List[ActionStep] = field(default_factory=list)
107 observations: List[Observation] = field(default_factory=list)
108 completed: bool = False
109 outcome: Optional[Dict[str, Any]] = None
110 completed_at: Optional[float] = None
112 def to_dict(self) -> Dict:
113 return {
114 **asdict(self),
115 'steps': [s.to_dict() for s in self.steps],
116 'observations': [o.to_dict() for o in self.observations],
117 }
120# ─── Orchestrator ───────────────────────────────────────────────────────
122class AgentAttributionOrchestrator:
123 """Single source of truth for agent action attribution.
125 Agents call begin/record/complete — this class routes everything
126 through the existing WorldModelBridge. No parallel storage.
127 """
129 def __init__(self):
130 self._lock = threading.RLock()
131 self._open_actions: Dict[str, AgentAction] = {}
132 self._stats = {
133 'total_begun': 0,
134 'total_completed': 0,
135 'total_timed_out': 0,
136 'total_steps_recorded': 0,
137 }
139 # ─── Public API ─────────────────────────────────────────────────
141 def begin_action(self, agent_id: str, action_type: str,
142 goal_id: Optional[str] = None,
143 expected_outcome: Optional[Dict] = None,
144 acceptance_criteria: Optional[List[str]] = None,
145 parent_action_id: Optional[str] = None) -> str:
146 """Start tracking a new action. Returns correlation action_id.
148 Args:
149 agent_id: Which agent is acting (e.g., 'agent_daemon', 'benchmark_prover')
150 action_type: Category (e.g., 'benchmark_run', 'optimization_cycle')
151 goal_id: Optional goal this action serves (for attribution)
152 expected_outcome: Dict of {metric: expected_value} for comparison
153 acceptance_criteria: List of string criteria for success
154 parent_action_id: When this action is spawned by another
155 action (sub-task, retry, cascade), pass the parent's
156 action_id so WMB can reconstruct the causal tree.
158 Returns:
159 action_id (UUID) — pass this to record_step and complete_action.
160 """
161 action_id = str(uuid.uuid4())
162 action = AgentAction(
163 action_id=action_id,
164 goal_id=goal_id,
165 agent_id=agent_id,
166 action_type=action_type,
167 started_at=time.time(),
168 parent_action_id=parent_action_id,
169 expected_outcome=expected_outcome or {},
170 acceptance_criteria=acceptance_criteria or [],
171 )
173 evicted_action = None
174 with self._lock:
175 # Evict oldest if at cap (force-complete with timeout outcome)
176 if len(self._open_actions) >= MAX_OPEN_ACTIONS:
177 evicted_action = self._force_timeout_oldest()
179 self._open_actions[action_id] = action
180 self._stats['total_begun'] += 1
182 # Submit evicted action to WMB OUTSIDE the lock (nested locks risk)
183 if evicted_action is not None:
184 try:
185 self._submit_to_world_model(evicted_action)
186 self._emit_completion_event(evicted_action)
187 except Exception as exc:
188 logger.debug("Evicted action submit failed: %s", exc)
190 logger.debug(
191 "Attribution: begin_action %s agent=%s type=%s goal=%s",
192 action_id[:8], agent_id, action_type, goal_id,
193 )
194 return action_id
196 def record_step(self, action_id: str, description: str,
197 state: Optional[Dict] = None,
198 decision: str = '',
199 confidence: float = 0.5) -> bool:
200 """Record an intermediate state in the action chain.
202 Called at every tick, every sub-decision, every state change.
203 Dense recording = better attribution on completion.
205 Returns:
206 True if recorded, False if action unknown/completed/full.
207 """
208 with self._lock:
209 action = self._open_actions.get(action_id)
210 if action is None or action.completed:
211 return False
212 if len(action.steps) >= MAX_STEPS_PER_ACTION:
213 return False
215 step = ActionStep(
216 timestamp=time.time(),
217 description=description[:500],
218 state=self._truncate_dict(state or {}),
219 decision=decision[:200],
220 confidence=max(0.0, min(1.0, confidence)),
221 )
222 action.steps.append(step)
223 self._stats['total_steps_recorded'] += 1
225 return True
227 def record_observation(self, action_id: str, observation_type: str,
228 data: Optional[Dict] = None,
229 source: str = '',
230 confidence: float = 1.0) -> bool:
231 """Attach an EXTERNAL observation to an open action.
233 Distinguished from record_step() — steps are what the AGENT
234 decided/saw/computed; observations are what the WORLD reports
235 back (sensor reading, user correction, world-state probe).
236 Stored in a separate list so WMB can contrast intent vs
237 grounded reality when scoring credit.
239 Args:
240 action_id: correlation id from begin_action
241 observation_type: 'sensor' | 'world_state' |
242 'user_feedback' | 'external' | ...
243 data: payload (truncated to 1000 chars in str form)
244 source: which subsystem reported it
245 confidence: [0.0, 1.0]
247 Returns:
248 True if attached, False if action unknown/completed/full.
249 """
250 with self._lock:
251 action = self._open_actions.get(action_id)
252 if action is None or action.completed:
253 return False
254 # Reuse MAX_STEPS_PER_ACTION as a combined ceiling — both
255 # steps and observations count against the same DoS bound.
256 if (len(action.observations) + len(action.steps)
257 >= MAX_STEPS_PER_ACTION):
258 return False
260 obs = Observation(
261 timestamp=time.time(),
262 observation_type=observation_type[:100],
263 data=self._truncate_dict(data or {}),
264 source=source[:100],
265 confidence=max(0.0, min(1.0, confidence)),
266 )
267 action.observations.append(obs)
268 return True
270 def complete_action(self, action_id: str,
271 outcome: Optional[Dict] = None) -> bool:
272 """Complete an action. Submits full chain to WorldModelBridge.
274 This is the long-horizon attribution moment:
275 1. Compute credit assignment across steps (later steps closer to outcome
276 get higher weight — exponential decay)
277 2. Compare outcome vs expected_outcome for success/failure signal
278 3. Submit chain + attribution + credit to WorldModelBridge for learning
280 Returns:
281 True if completed, False if action unknown.
282 """
283 with self._lock:
284 action = self._open_actions.pop(action_id, None)
285 if action is None:
286 return False
288 action.completed = True
289 action.outcome = outcome or {}
290 action.completed_at = time.time()
291 self._stats['total_completed'] += 1
293 self._submit_to_world_model(action)
294 self._emit_completion_event(action)
295 return True
297 def get_action(self, action_id: str) -> Optional[AgentAction]:
298 """Query a live or recently-completed action."""
299 with self._lock:
300 return self._open_actions.get(action_id)
302 def get_stats(self) -> Dict:
303 """Stats for observability/health checks."""
304 with self._lock:
305 return {
306 **self._stats,
307 'open_count': len(self._open_actions),
308 }
310 def cleanup_expired(self) -> int:
311 """Force-complete actions older than TTL. Returns count timed out.
313 Called periodically by the agent daemon tick.
314 complete_action handles submit + event emission + total_completed.
315 We additionally tag these as timed_out for stats distinction.
316 """
317 now = time.time()
318 expired_ids = []
319 with self._lock:
320 for aid, action in self._open_actions.items():
321 if now - action.started_at > ACTION_TTL_SECONDS:
322 expired_ids.append(aid)
324 timed_out = 0
325 for aid in expired_ids:
326 if self.complete_action(aid, outcome={'status': 'timeout'}):
327 timed_out += 1
329 if timed_out:
330 with self._lock:
331 self._stats['total_timed_out'] += timed_out
333 return timed_out
335 # ─── Internal: attribution + WorldModelBridge routing ──────────
337 def _submit_to_world_model(self, action: AgentAction) -> None:
338 """Submit the completed action chain to WorldModelBridge.
340 Uses the EXISTING record_interaction API — goal_id field is
341 first-class. Attribution chain goes in the prompt field as
342 structured JSON so HevolveAI can parse credit assignments.
343 """
344 try:
345 from integrations.agent_engine.world_model_bridge import (
346 get_world_model_bridge,
347 )
348 bridge = get_world_model_bridge()
349 except ImportError:
350 logger.debug("WorldModelBridge unavailable, skipping attribution submit")
351 return
352 except Exception as exc:
353 logger.debug("WorldModelBridge init failed: %s", exc)
354 return
356 # Compute credit assignment: exponential decay from outcome backward.
357 # Later steps (closer to outcome) get higher weight.
358 credits = self._compute_credit_assignment(action)
360 # Compare outcome vs expected for success signal
361 success_score = self._compute_success_score(action)
363 # Build structured experience — uses existing record_interaction schema
364 chain_summary = {
365 'action_id': action.action_id,
366 'parent_action_id': action.parent_action_id,
367 'agent_id': action.agent_id,
368 'action_type': action.action_type,
369 'goal_id': action.goal_id,
370 'expected_outcome': action.expected_outcome,
371 'acceptance_criteria': action.acceptance_criteria,
372 'duration_seconds': round(
373 (action.completed_at or time.time()) - action.started_at, 2),
374 'step_count': len(action.steps),
375 'observation_count': len(action.observations),
376 'outcome': action.outcome,
377 'success_score': success_score,
378 'step_credits': credits,
379 'steps_summary': [
380 {
381 'desc': s.description,
382 'decision': s.decision,
383 'confidence': s.confidence,
384 'credit': credits.get(i, 0.0),
385 }
386 for i, s in enumerate(action.steps[-50:]) # last 50 steps
387 ],
388 # External observations separated from agent-stated steps —
389 # WMB uses these as the grounding signal vs the agent's
390 # self-reported state.
391 'observations': [o.to_dict() for o in action.observations[-50:]],
392 }
394 # Compact human-readable summary for the 'prompt' field — the
395 # structured chain goes in attribution_chain below. This keeps
396 # HevolveAI's text-distillation path happy (it now sees a
397 # narrative sentence, not a JSON blob).
398 summary_text = (
399 f"{action.agent_id} ran {action.action_type} "
400 f"(goal={action.goal_id}, parent={action.parent_action_id}, "
401 f"steps={len(action.steps)}, observations={len(action.observations)}, "
402 f"success={round(success_score, 2)})"
403 )
405 try:
406 bridge.record_interaction(
407 user_id=action.agent_id,
408 prompt_id=action.action_id,
409 prompt=summary_text[:2000],
410 response=json.dumps(action.outcome or {}, default=str)[:5000],
411 model_id=f'{action.agent_id}:{action.action_type}',
412 latency_ms=(action.completed_at - action.started_at) * 1000 if action.completed_at else 0,
413 goal_id=action.goal_id,
414 attribution_chain=chain_summary,
415 )
416 except TypeError:
417 # Older WMB without attribution_chain kwarg — fall back to
418 # packing JSON in prompt as before.
419 try:
420 bridge.record_interaction(
421 user_id=action.agent_id,
422 prompt_id=action.action_id,
423 prompt=json.dumps(chain_summary, default=str)[:2000],
424 response=json.dumps(action.outcome or {}, default=str)[:5000],
425 model_id=f'{action.agent_id}:{action.action_type}',
426 latency_ms=(action.completed_at - action.started_at) * 1000 if action.completed_at else 0,
427 goal_id=action.goal_id,
428 )
429 except Exception as exc:
430 logger.debug("record_interaction fallback failed: %s", exc)
431 except Exception as exc:
432 logger.debug("record_interaction failed: %s", exc)
434 def _compute_credit_assignment(self, action: AgentAction) -> Dict[int, float]:
435 """Assign credit to each step for the final outcome.
437 Combines two signals — temporal decay is a useful prior, but
438 pure temporal weighting gives late-but-low-confidence filler
439 steps equal credit with late-and-decisive ones. Multiplying
440 by per-step confidence softens that, letting agents signal
441 'I wasn't sure here' vs 'this was the decisive moment'.
443 Formula per step i of n steps:
444 weight_raw(i) = 0.9 ^ (n - 1 - i) # temporal decay
445 * (0.5 + 0.5 * confidence) # confidence mix
446 credit(i) = weight_raw(i) / sum(weight_raw)
448 - Pure unconfident step (0.0) still gets 0.5x the temporal
449 weight (not zero — we don't want to reward agents who
450 under-report confidence to game the metric).
451 - Pure confident step (1.0) gets full temporal weight.
452 - Sum still normalizes to 1.0.
454 Returns:
455 Dict mapping step_index → credit weight (sum ≈ 1.0).
456 """
457 n = len(action.steps)
458 if n == 0:
459 return {}
461 decay = 0.9
462 raw_weights = {}
463 for i, step in enumerate(action.steps):
464 temporal = decay ** (n - 1 - i)
465 confidence_boost = 0.5 + 0.5 * max(0.0, min(1.0, step.confidence))
466 raw_weights[i] = temporal * confidence_boost
468 total = sum(raw_weights.values())
469 if total <= 0:
470 return {i: 1.0 / n for i in range(n)}
471 return {i: round(w / total, 4) for i, w in raw_weights.items()}
473 def _compute_success_score(self, action: AgentAction) -> float:
474 """Compare outcome vs expected_outcome. Returns [0.0, 1.0].
476 - If expected_outcome is empty: return 0.5 (neutral)
477 - If outcome has 'error' / 'status' == 'error': return 0.0
478 - Otherwise: fraction of expected keys matched within tolerance.
479 """
480 outcome = action.outcome or {}
481 if not action.expected_outcome:
482 # No expectation set — neutral score, use outcome status
483 if outcome.get('status') == 'error' or outcome.get('error'):
484 return 0.0
485 if outcome.get('status') == 'timeout':
486 return 0.2
487 return 0.5
489 if outcome.get('status') == 'error' or outcome.get('error'):
490 return 0.0
492 # Check expected keys
493 matched = 0
494 total = 0
495 for key, expected in action.expected_outcome.items():
496 total += 1
497 actual = outcome.get(key)
498 if actual is None:
499 continue
500 if isinstance(expected, (int, float)) and isinstance(actual, (int, float)):
501 # Numeric: within 10% tolerance (relative) OR 0.05 absolute for small values.
502 # The absolute band catches cases like expected=0 + actual=0.03 (delta
503 # metrics), where pure relative tolerance would fail every near-zero case.
504 if abs(actual - expected) <= 0.05:
505 matched += 1
506 elif expected != 0 and abs(actual - expected) / abs(expected) <= 0.1:
507 matched += 1
508 elif actual == expected:
509 matched += 1
511 return matched / total if total > 0 else 0.5
513 def _emit_completion_event(self, action: AgentAction) -> None:
514 """Emit EventBus event for real-time dashboards."""
515 try:
516 from core.platform.events import emit_event
517 emit_event('agent.action.completed', {
518 'action_id': action.action_id,
519 'agent_id': action.agent_id,
520 'action_type': action.action_type,
521 'goal_id': action.goal_id,
522 'duration_seconds': round(
523 (action.completed_at or time.time()) - action.started_at, 2),
524 'step_count': len(action.steps),
525 'success_score': self._compute_success_score(action),
526 })
527 except Exception:
528 pass # Best effort — never crash on observability
530 def _force_timeout_oldest(self) -> Optional[AgentAction]:
531 """Force-complete the oldest open action when at cap.
533 Caller MUST hold _lock. Returns the evicted action so the caller
534 can submit it to WorldModelBridge OUTSIDE the lock (avoids nested
535 lock acquisition on WMB's internal lock).
536 """
537 if not self._open_actions:
538 return None
539 oldest_id = min(
540 self._open_actions.keys(),
541 key=lambda k: self._open_actions[k].started_at,
542 )
543 oldest = self._open_actions.pop(oldest_id)
544 oldest.completed = True
545 oldest.outcome = {'status': 'evicted', 'reason': 'max_open_actions'}
546 oldest.completed_at = time.time()
547 self._stats['total_timed_out'] += 1
548 return oldest
550 @staticmethod
551 def _truncate_dict(d: Dict) -> Dict:
552 """Truncate string values in dict to prevent unbounded state growth."""
553 out = {}
554 for k, v in d.items():
555 if isinstance(v, str) and len(v) > 500:
556 out[k] = v[:500] + '...'
557 elif isinstance(v, (dict, list)) and len(str(v)) > 1000:
558 out[k] = str(v)[:1000] + '...'
559 else:
560 out[k] = v
561 return out
564# ─── Singleton ──────────────────────────────────────────────────────────
566_orchestrator: Optional[AgentAttributionOrchestrator] = None
567_orchestrator_lock = threading.Lock()
570def get_attribution() -> AgentAttributionOrchestrator:
571 """Get the singleton orchestrator. Thread-safe."""
572 global _orchestrator
573 if _orchestrator is None:
574 with _orchestrator_lock:
575 if _orchestrator is None:
576 _orchestrator = AgentAttributionOrchestrator()
577 return _orchestrator
580# ─── Convenience functions (thin wrappers for call-site brevity) ───────
582def begin_action(agent_id: str, action_type: str,
583 goal_id: Optional[str] = None,
584 expected_outcome: Optional[Dict] = None,
585 acceptance_criteria: Optional[List[str]] = None,
586 parent_action_id: Optional[str] = None) -> str:
587 """Convenience: start tracking. See AgentAttributionOrchestrator.begin_action."""
588 return get_attribution().begin_action(
589 agent_id, action_type, goal_id, expected_outcome,
590 acceptance_criteria, parent_action_id,
591 )
594def record_observation(action_id: str, observation_type: str,
595 data: Optional[Dict] = None,
596 source: str = '',
597 confidence: float = 1.0) -> bool:
598 """Convenience: attach external-world observation to an open action."""
599 return get_attribution().record_observation(
600 action_id, observation_type, data, source, confidence,
601 )
604def record_step(action_id: str, description: str,
605 state: Optional[Dict] = None,
606 decision: str = '',
607 confidence: float = 0.5) -> bool:
608 """Convenience: record intermediate state."""
609 return get_attribution().record_step(
610 action_id, description, state, decision, confidence,
611 )
614def complete_action(action_id: str, outcome: Optional[Dict] = None) -> bool:
615 """Convenience: complete action and submit attribution."""
616 return get_attribution().complete_action(action_id, outcome)