Coverage for integrations / agent_engine / reasoning_trace.py: 80.8%
52 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2ReasoningTrace — append-only decision log for hive_consensus.
4The ml_intern brief §5 package C calls for a
5`monitoring/reasoning_trace.py` so every promotion (or rejection) is
6"auditable forever". This is the implementation.
8Storage layout:
9 agent_data/reasoning_traces/{YYYY-MM-DD}.jsonl
11One line per decision. JSONL so a tail -f can stream decisions live
12to an operator. No deletion API — audit log is append-only by
13contract. Rotation is by day; the file grows for a calendar day then
14a new file is started.
16Related:
17- security/immutable_audit_log.py already exists for security-sensitive
18 events; we do NOT reuse it here because consensus decisions are
19 operational audit, not security audit. Mixing the two would
20 confuse the hash-chain analysis tool. Single-responsibility: this
21 module records consensus decisions and ONLY consensus decisions.
22"""
23from __future__ import annotations
25import json
26import logging
27import os
28import threading
29import time
30from datetime import datetime
31from typing import Any, Dict, Optional
33logger = logging.getLogger('hevolve_social')
35_TRACE_LOCK = threading.Lock()
38def _resolve_trace_dir() -> str:
39 try:
40 from core.platform_paths import get_agent_data_dir
41 return os.path.join(get_agent_data_dir(), 'reasoning_traces')
42 except Exception:
43 pass
44 if os.name == 'nt':
45 return os.path.join(
46 os.path.expanduser('~'),
47 'Documents', 'Nunba', 'data', 'agent_data', 'reasoning_traces',
48 )
49 return os.path.join('agent_data', 'reasoning_traces')
52def _file_for(day: Optional[str] = None) -> str:
53 root = _resolve_trace_dir()
54 if day is None:
55 day = datetime.utcnow().strftime('%Y-%m-%d')
56 return os.path.join(root, f'{day}.jsonl')
59def record_decision(
60 action: str,
61 approved: bool,
62 votes: Dict[str, Any],
63 subject: Dict[str, Any],
64 reason: str = '',
65 event_bus_emit: bool = True,
66) -> bool:
67 """Record one consensus decision.
69 Args:
70 action: short verb ("upgrade_proposal", "rollback", "halt", etc.)
71 approved: final verdict
72 votes: mapping of voter_name → {passed: bool, reason: str}
73 subject: what the decision was about (agent_id, new_prompt, etc.)
74 reason: free-text explanation surfaced to the dashboard
75 event_bus_emit: also fire a `learning.federation_update`
76 EventBus event so peers can observe the decision
77 (consent-gated upstream by ScopeGuard)
78 """
79 entry = {
80 'timestamp': time.time(),
81 'datetime_utc': datetime.utcnow().isoformat() + 'Z',
82 'action': action,
83 'approved': bool(approved),
84 'votes': votes,
85 'subject': subject,
86 'reason': reason,
87 }
88 path = _file_for()
89 try:
90 os.makedirs(os.path.dirname(path), exist_ok=True)
91 line = json.dumps(entry, sort_keys=True)
92 with _TRACE_LOCK:
93 with open(path, 'a', encoding='utf-8') as fh:
94 fh.write(line + '\n')
95 except Exception as exc:
96 logger.warning(f'[reasoning_trace] persist failed: {exc}')
97 return False
99 if event_bus_emit:
100 try:
101 from core.platform.events import emit_event
102 emit_event('learning.federation_update', {
103 'kind': 'consensus_decision',
104 'action': action,
105 'approved': approved,
106 'subject_summary': {
107 k: v for k, v in subject.items()
108 if k in ('agent_id', 'goal_type', 'version')
109 },
110 })
111 except Exception as exc:
112 logger.debug(f'[reasoning_trace] event emit failed: {exc}')
113 return True
116def read_recent(limit: int = 200) -> list:
117 """Read the most recent decisions for dashboard / audit display."""
118 path = _file_for()
119 if not os.path.exists(path):
120 return []
121 rows = []
122 try:
123 with open(path, 'r', encoding='utf-8') as fh:
124 for line in fh:
125 line = line.strip()
126 if not line:
127 continue
128 try:
129 rows.append(json.loads(line))
130 except json.JSONDecodeError:
131 continue
132 except Exception as exc:
133 logger.debug(f'[reasoning_trace] read failed: {exc}')
134 return []
135 return rows[-limit:]