Coverage for integrations / agent_engine / exception_watcher.py: 59.1%
110 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Exception Watcher — Idle Agent Monitor
3========================================
5Assigns idle (non-participating) agents to watch for exceptions.
6When exceptions arrive, watchers classify severity and trigger
7the SelfHealingDispatcher for critical/high patterns.
9This is the "non-participating coding agent actively watches" feature.
10Runs inside AgentDaemon._tick() on the existing periodic schedule.
11"""
12import time
13import logging
14import threading
15from typing import Dict, List, Optional
16from sqlalchemy.orm import Session
18logger = logging.getLogger('hevolve_social')
20# Severity classification thresholds
21CRITICAL_TYPES = frozenset({
22 'SystemExit', 'MemoryError', 'RecursionError',
23 'DatabaseError', 'OperationalError', 'IntegrityError',
24})
25HIGH_TYPES = frozenset({
26 'KeyError', 'AttributeError', 'TypeError', 'ValueError',
27 'IndexError', 'ImportError', 'FileNotFoundError',
28 'ConnectionError', 'TimeoutError', 'PermissionError',
29})
32class ExceptionWatcher:
33 """Assigns idle agents as exception watchers.
35 When more idle agents exist than active goals, excess agents
36 are assigned as watchers. They monitor the ExceptionCollector
37 and trigger SelfHealingDispatcher for severe patterns.
38 """
40 _instance = None
41 _create_lock = threading.Lock()
43 def __init__(self):
44 self._watchers: Dict[str, Dict] = {} # user_id → watcher info
45 self._lock = threading.RLock()
46 self._last_process = 0.0
48 @classmethod
49 def get_instance(cls) -> 'ExceptionWatcher':
50 if cls._instance is None:
51 with cls._create_lock:
52 if cls._instance is None:
53 cls._instance = cls()
54 return cls._instance
56 @classmethod
57 def reset_instance(cls):
58 """Reset singleton (for testing)."""
59 with cls._create_lock:
60 cls._instance = None
62 def assign_watcher(self, user_id: str, username: str):
63 """Assign an idle agent as an exception watcher."""
64 with self._lock:
65 if user_id in self._watchers:
66 return # already watching
67 self._watchers[user_id] = {
68 'user_id': user_id,
69 'username': username,
70 'assigned_at': time.time(),
71 'exceptions_reported': 0,
72 'critical_reported': 0,
73 'high_reported': 0,
74 }
75 logger.info(f"Exception watcher assigned: {username} ({user_id})")
77 def release_watcher(self, user_id: str):
78 """Release a watcher (e.g. when agent gets a real task)."""
79 with self._lock:
80 removed = self._watchers.pop(user_id, None)
81 if removed:
82 logger.info(f"Exception watcher released: {removed['username']} "
83 f"(reported {removed['exceptions_reported']} exceptions)")
85 def release_all(self):
86 """Release all watchers."""
87 with self._lock:
88 self._watchers.clear()
90 def has_watchers(self) -> bool:
91 """Check if any watchers are assigned."""
92 with self._lock:
93 return len(self._watchers) > 0
95 def get_watcher_count(self) -> int:
96 """Get number of active watchers."""
97 with self._lock:
98 return len(self._watchers)
100 def process_exceptions(self, db: Session) -> int:
101 """Process exceptions through watchers.
103 Called from AgentDaemon._tick().
105 1. Get recent unresolved exceptions from ExceptionCollector
106 2. Classify severity (critical/high/low)
107 3. For critical/high: trigger SelfHealingDispatcher immediately
108 (bypasses the normal min_occurrences threshold for critical)
109 4. Update watcher stats
111 Returns number of exceptions processed.
112 """
113 if not self.has_watchers():
114 return 0
116 try:
117 from exception_collector import ExceptionCollector
118 collector = ExceptionCollector.get_instance()
119 except ImportError:
120 return 0
122 # Get exceptions since last process
123 unresolved = collector.get_unresolved(since=self._last_process)
124 self._last_process = time.time()
126 if not unresolved:
127 return 0
129 # Classify severity
130 critical = []
131 high = []
133 for rec in unresolved:
134 severity = self._classify_severity(rec)
135 if severity == 'critical':
136 critical.append(rec)
137 elif severity == 'high':
138 high.append(rec)
140 processed = 0
142 # For critical exceptions, trigger fix with lower threshold (1 occurrence)
143 if critical:
144 try:
145 from .self_healing_dispatcher import SelfHealingDispatcher
146 dispatcher = SelfHealingDispatcher.get_instance()
147 # Temporarily lower threshold for critical
148 original_min = dispatcher._min_occurrences
149 dispatcher._min_occurrences = 1
150 dispatcher._last_check = 0 # force check
151 fix_count = dispatcher.check_and_dispatch(db)
152 dispatcher._min_occurrences = original_min
153 if fix_count > 0:
154 logger.info(f"Watcher triggered {fix_count} critical fix goal(s)")
155 processed += len(critical)
156 except Exception as e:
157 logger.debug(f"Watcher critical dispatch failed: {e}")
159 # For high severity, use normal threshold
160 if high:
161 try:
162 from .self_healing_dispatcher import SelfHealingDispatcher
163 dispatcher = SelfHealingDispatcher.get_instance()
164 dispatcher._last_check = 0 # force check
165 fix_count = dispatcher.check_and_dispatch(db)
166 if fix_count > 0:
167 logger.info(f"Watcher triggered {fix_count} high-severity fix goal(s)")
168 processed += len(high)
169 except Exception as e:
170 logger.debug(f"Watcher high dispatch failed: {e}")
172 # Update watcher stats
173 with self._lock:
174 for watcher in self._watchers.values():
175 watcher['exceptions_reported'] += len(unresolved)
176 watcher['critical_reported'] += len(critical)
177 watcher['high_reported'] += len(high)
179 return processed
181 def _classify_severity(self, record) -> str:
182 """Classify exception severity.
184 Returns: 'critical', 'high', or 'low'
185 """
186 exc_type = record.exc_type
188 if exc_type in CRITICAL_TYPES:
189 return 'critical'
190 if exc_type in HIGH_TYPES:
191 return 'high'
193 # Check message for severity hints
194 msg_lower = record.exc_message.lower()
195 if any(word in msg_lower for word in ('corrupt', 'fatal', 'crash', 'data loss')):
196 return 'critical'
197 if any(word in msg_lower for word in ('failed', 'missing', 'not found', 'denied')):
198 return 'high'
200 return 'low'
202 def get_watcher_stats(self) -> Dict:
203 """Get stats about active watchers and their reports."""
204 with self._lock:
205 watchers = list(self._watchers.values())
206 total_reported = sum(w['exceptions_reported'] for w in watchers)
207 total_critical = sum(w['critical_reported'] for w in watchers)
209 return {
210 'active_watchers': len(watchers),
211 'total_exceptions_reported': total_reported,
212 'total_critical_reported': total_critical,
213 'watchers': [
214 {
215 'user_id': w['user_id'],
216 'username': w['username'],
217 'watching_since': w['assigned_at'],
218 'exceptions_reported': w['exceptions_reported'],
219 }
220 for w in watchers
221 ],
222 }