Coverage for integrations / agent_engine / exception_watcher.py: 59.1%

110 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Exception Watcher — Idle Agent Monitor 

3======================================== 

4 

5Assigns idle (non-participating) agents to watch for exceptions. 

6When exceptions arrive, watchers classify severity and trigger 

7the SelfHealingDispatcher for critical/high patterns. 

8 

9This is the "non-participating coding agent actively watches" feature. 

10Runs inside AgentDaemon._tick() on the existing periodic schedule. 

11""" 

12import time 

13import logging 

14import threading 

15from typing import Dict, List, Optional 

16from sqlalchemy.orm import Session 

17 

18logger = logging.getLogger('hevolve_social') 

19 

20# Severity classification thresholds 

21CRITICAL_TYPES = frozenset({ 

22 'SystemExit', 'MemoryError', 'RecursionError', 

23 'DatabaseError', 'OperationalError', 'IntegrityError', 

24}) 

25HIGH_TYPES = frozenset({ 

26 'KeyError', 'AttributeError', 'TypeError', 'ValueError', 

27 'IndexError', 'ImportError', 'FileNotFoundError', 

28 'ConnectionError', 'TimeoutError', 'PermissionError', 

29}) 

30 

31 

32class ExceptionWatcher: 

33 """Assigns idle agents as exception watchers. 

34 

35 When more idle agents exist than active goals, excess agents 

36 are assigned as watchers. They monitor the ExceptionCollector 

37 and trigger SelfHealingDispatcher for severe patterns. 

38 """ 

39 

40 _instance = None 

41 _create_lock = threading.Lock() 

42 

43 def __init__(self): 

44 self._watchers: Dict[str, Dict] = {} # user_id → watcher info 

45 self._lock = threading.RLock() 

46 self._last_process = 0.0 

47 

48 @classmethod 

49 def get_instance(cls) -> 'ExceptionWatcher': 

50 if cls._instance is None: 

51 with cls._create_lock: 

52 if cls._instance is None: 

53 cls._instance = cls() 

54 return cls._instance 

55 

56 @classmethod 

57 def reset_instance(cls): 

58 """Reset singleton (for testing).""" 

59 with cls._create_lock: 

60 cls._instance = None 

61 

62 def assign_watcher(self, user_id: str, username: str): 

63 """Assign an idle agent as an exception watcher.""" 

64 with self._lock: 

65 if user_id in self._watchers: 

66 return # already watching 

67 self._watchers[user_id] = { 

68 'user_id': user_id, 

69 'username': username, 

70 'assigned_at': time.time(), 

71 'exceptions_reported': 0, 

72 'critical_reported': 0, 

73 'high_reported': 0, 

74 } 

75 logger.info(f"Exception watcher assigned: {username} ({user_id})") 

76 

77 def release_watcher(self, user_id: str): 

78 """Release a watcher (e.g. when agent gets a real task).""" 

79 with self._lock: 

80 removed = self._watchers.pop(user_id, None) 

81 if removed: 

82 logger.info(f"Exception watcher released: {removed['username']} " 

83 f"(reported {removed['exceptions_reported']} exceptions)") 

84 

85 def release_all(self): 

86 """Release all watchers.""" 

87 with self._lock: 

88 self._watchers.clear() 

89 

90 def has_watchers(self) -> bool: 

91 """Check if any watchers are assigned.""" 

92 with self._lock: 

93 return len(self._watchers) > 0 

94 

95 def get_watcher_count(self) -> int: 

96 """Get number of active watchers.""" 

97 with self._lock: 

98 return len(self._watchers) 

99 

100 def process_exceptions(self, db: Session) -> int: 

101 """Process exceptions through watchers. 

102 

103 Called from AgentDaemon._tick(). 

104 

105 1. Get recent unresolved exceptions from ExceptionCollector 

106 2. Classify severity (critical/high/low) 

107 3. For critical/high: trigger SelfHealingDispatcher immediately 

108 (bypasses the normal min_occurrences threshold for critical) 

109 4. Update watcher stats 

110 

111 Returns number of exceptions processed. 

112 """ 

113 if not self.has_watchers(): 

114 return 0 

115 

116 try: 

117 from exception_collector import ExceptionCollector 

118 collector = ExceptionCollector.get_instance() 

119 except ImportError: 

120 return 0 

121 

122 # Get exceptions since last process 

123 unresolved = collector.get_unresolved(since=self._last_process) 

124 self._last_process = time.time() 

125 

126 if not unresolved: 

127 return 0 

128 

129 # Classify severity 

130 critical = [] 

131 high = [] 

132 

133 for rec in unresolved: 

134 severity = self._classify_severity(rec) 

135 if severity == 'critical': 

136 critical.append(rec) 

137 elif severity == 'high': 

138 high.append(rec) 

139 

140 processed = 0 

141 

142 # For critical exceptions, trigger fix with lower threshold (1 occurrence) 

143 if critical: 

144 try: 

145 from .self_healing_dispatcher import SelfHealingDispatcher 

146 dispatcher = SelfHealingDispatcher.get_instance() 

147 # Temporarily lower threshold for critical 

148 original_min = dispatcher._min_occurrences 

149 dispatcher._min_occurrences = 1 

150 dispatcher._last_check = 0 # force check 

151 fix_count = dispatcher.check_and_dispatch(db) 

152 dispatcher._min_occurrences = original_min 

153 if fix_count > 0: 

154 logger.info(f"Watcher triggered {fix_count} critical fix goal(s)") 

155 processed += len(critical) 

156 except Exception as e: 

157 logger.debug(f"Watcher critical dispatch failed: {e}") 

158 

159 # For high severity, use normal threshold 

160 if high: 

161 try: 

162 from .self_healing_dispatcher import SelfHealingDispatcher 

163 dispatcher = SelfHealingDispatcher.get_instance() 

164 dispatcher._last_check = 0 # force check 

165 fix_count = dispatcher.check_and_dispatch(db) 

166 if fix_count > 0: 

167 logger.info(f"Watcher triggered {fix_count} high-severity fix goal(s)") 

168 processed += len(high) 

169 except Exception as e: 

170 logger.debug(f"Watcher high dispatch failed: {e}") 

171 

172 # Update watcher stats 

173 with self._lock: 

174 for watcher in self._watchers.values(): 

175 watcher['exceptions_reported'] += len(unresolved) 

176 watcher['critical_reported'] += len(critical) 

177 watcher['high_reported'] += len(high) 

178 

179 return processed 

180 

181 def _classify_severity(self, record) -> str: 

182 """Classify exception severity. 

183 

184 Returns: 'critical', 'high', or 'low' 

185 """ 

186 exc_type = record.exc_type 

187 

188 if exc_type in CRITICAL_TYPES: 

189 return 'critical' 

190 if exc_type in HIGH_TYPES: 

191 return 'high' 

192 

193 # Check message for severity hints 

194 msg_lower = record.exc_message.lower() 

195 if any(word in msg_lower for word in ('corrupt', 'fatal', 'crash', 'data loss')): 

196 return 'critical' 

197 if any(word in msg_lower for word in ('failed', 'missing', 'not found', 'denied')): 

198 return 'high' 

199 

200 return 'low' 

201 

202 def get_watcher_stats(self) -> Dict: 

203 """Get stats about active watchers and their reports.""" 

204 with self._lock: 

205 watchers = list(self._watchers.values()) 

206 total_reported = sum(w['exceptions_reported'] for w in watchers) 

207 total_critical = sum(w['critical_reported'] for w in watchers) 

208 

209 return { 

210 'active_watchers': len(watchers), 

211 'total_exceptions_reported': total_reported, 

212 'total_critical_reported': total_critical, 

213 'watchers': [ 

214 { 

215 'user_id': w['user_id'], 

216 'username': w['username'], 

217 'watching_since': w['assigned_at'], 

218 'exceptions_reported': w['exceptions_reported'], 

219 } 

220 for w in watchers 

221 ], 

222 }