Coverage for integrations / agent_engine / reasoning_trace.py: 80.8%

52 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2ReasoningTrace — append-only decision log for hive_consensus. 

3 

4The ml_intern brief §5 package C calls for a 

5`monitoring/reasoning_trace.py` so every promotion (or rejection) is 

6"auditable forever". This is the implementation. 

7 

8Storage layout: 

9 agent_data/reasoning_traces/{YYYY-MM-DD}.jsonl 

10 

11One line per decision. JSONL so a tail -f can stream decisions live 

12to an operator. No deletion API — audit log is append-only by 

13contract. Rotation is by day; the file grows for a calendar day then 

14a new file is started. 

15 

16Related: 

17- security/immutable_audit_log.py already exists for security-sensitive 

18 events; we do NOT reuse it here because consensus decisions are 

19 operational audit, not security audit. Mixing the two would 

20 confuse the hash-chain analysis tool. Single-responsibility: this 

21 module records consensus decisions and ONLY consensus decisions. 

22""" 

23from __future__ import annotations 

24 

25import json 

26import logging 

27import os 

28import threading 

29import time 

30from datetime import datetime 

31from typing import Any, Dict, Optional 

32 

33logger = logging.getLogger('hevolve_social') 

34 

35_TRACE_LOCK = threading.Lock() 

36 

37 

38def _resolve_trace_dir() -> str: 

39 try: 

40 from core.platform_paths import get_agent_data_dir 

41 return os.path.join(get_agent_data_dir(), 'reasoning_traces') 

42 except Exception: 

43 pass 

44 if os.name == 'nt': 

45 return os.path.join( 

46 os.path.expanduser('~'), 

47 'Documents', 'Nunba', 'data', 'agent_data', 'reasoning_traces', 

48 ) 

49 return os.path.join('agent_data', 'reasoning_traces') 

50 

51 

52def _file_for(day: Optional[str] = None) -> str: 

53 root = _resolve_trace_dir() 

54 if day is None: 

55 day = datetime.utcnow().strftime('%Y-%m-%d') 

56 return os.path.join(root, f'{day}.jsonl') 

57 

58 

59def record_decision( 

60 action: str, 

61 approved: bool, 

62 votes: Dict[str, Any], 

63 subject: Dict[str, Any], 

64 reason: str = '', 

65 event_bus_emit: bool = True, 

66) -> bool: 

67 """Record one consensus decision. 

68 

69 Args: 

70 action: short verb ("upgrade_proposal", "rollback", "halt", etc.) 

71 approved: final verdict 

72 votes: mapping of voter_name → {passed: bool, reason: str} 

73 subject: what the decision was about (agent_id, new_prompt, etc.) 

74 reason: free-text explanation surfaced to the dashboard 

75 event_bus_emit: also fire a `learning.federation_update` 

76 EventBus event so peers can observe the decision 

77 (consent-gated upstream by ScopeGuard) 

78 """ 

79 entry = { 

80 'timestamp': time.time(), 

81 'datetime_utc': datetime.utcnow().isoformat() + 'Z', 

82 'action': action, 

83 'approved': bool(approved), 

84 'votes': votes, 

85 'subject': subject, 

86 'reason': reason, 

87 } 

88 path = _file_for() 

89 try: 

90 os.makedirs(os.path.dirname(path), exist_ok=True) 

91 line = json.dumps(entry, sort_keys=True) 

92 with _TRACE_LOCK: 

93 with open(path, 'a', encoding='utf-8') as fh: 

94 fh.write(line + '\n') 

95 except Exception as exc: 

96 logger.warning(f'[reasoning_trace] persist failed: {exc}') 

97 return False 

98 

99 if event_bus_emit: 

100 try: 

101 from core.platform.events import emit_event 

102 emit_event('learning.federation_update', { 

103 'kind': 'consensus_decision', 

104 'action': action, 

105 'approved': approved, 

106 'subject_summary': { 

107 k: v for k, v in subject.items() 

108 if k in ('agent_id', 'goal_type', 'version') 

109 }, 

110 }) 

111 except Exception as exc: 

112 logger.debug(f'[reasoning_trace] event emit failed: {exc}') 

113 return True 

114 

115 

116def read_recent(limit: int = 200) -> list: 

117 """Read the most recent decisions for dashboard / audit display.""" 

118 path = _file_for() 

119 if not os.path.exists(path): 

120 return [] 

121 rows = [] 

122 try: 

123 with open(path, 'r', encoding='utf-8') as fh: 

124 for line in fh: 

125 line = line.strip() 

126 if not line: 

127 continue 

128 try: 

129 rows.append(json.loads(line)) 

130 except json.JSONDecodeError: 

131 continue 

132 except Exception as exc: 

133 logger.debug(f'[reasoning_trace] read failed: {exc}') 

134 return [] 

135 return rows[-limit:]