Coverage for integrations / channels / admin / dashboard.py: 47.2%
267 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Admin Dashboard - Server-side Dashboard Data Provider
4Provides aggregated statistics and real-time data for admin dashboards.
5Designed for Docker environments with persistent stats storage.
7Features:
8- Dashboard statistics aggregation
9- Active session monitoring
10- Channel status tracking
11- Queue statistics
12- Error logging with persistence
13- Docker volume support for data persistence
14"""
16from __future__ import annotations
18import json
19import logging
20import os
21import time
22from dataclasses import dataclass, field, asdict
23from datetime import datetime, timedelta
24from typing import Any, Dict, List, Optional
25from collections import deque
26from enum import Enum
28logger = logging.getLogger(__name__)
31class ChannelStatus(str, Enum):
32 """Channel connection status."""
33 CONNECTED = "connected"
34 DISCONNECTED = "disconnected"
35 CONNECTING = "connecting"
36 ERROR = "error"
37 PAUSED = "paused"
38 RATE_LIMITED = "rate_limited"
41class ErrorSeverity(str, Enum):
42 """Error severity levels."""
43 DEBUG = "debug"
44 INFO = "info"
45 WARNING = "warning"
46 ERROR = "error"
47 CRITICAL = "critical"
50@dataclass
51class DashboardStats:
52 """Aggregated dashboard statistics."""
53 timestamp: str = ""
54 uptime_seconds: float = 0.0
55 # Message stats
56 total_messages_processed: int = 0
57 messages_today: int = 0
58 messages_this_hour: int = 0
59 messages_per_minute: float = 0.0
60 # Session stats
61 active_sessions: int = 0
62 total_sessions_today: int = 0
63 # Channel stats
64 active_channels: int = 0
65 total_channels: int = 0
66 # Queue stats
67 queue_depth: int = 0
68 queue_processing_rate: float = 0.0
69 # Performance stats
70 avg_response_time_ms: float = 0.0
71 p99_response_time_ms: float = 0.0
72 # Error stats
73 error_count_today: int = 0
74 error_rate_percent: float = 0.0
75 # Resource stats
76 memory_usage_mb: float = 0.0
77 cpu_usage_percent: float = 0.0
79 def to_dict(self) -> Dict[str, Any]:
80 return asdict(self)
83@dataclass
84class SessionInfo:
85 """Information about an active session."""
86 session_id: str
87 channel: str
88 user_id: str
89 chat_id: str
90 started_at: str
91 last_activity: str
92 message_count: int = 0
93 state: str = "active"
94 metadata: Dict[str, Any] = field(default_factory=dict)
96 def to_dict(self) -> Dict[str, Any]:
97 return asdict(self)
100@dataclass
101class ChannelStatusInfo:
102 """Status information for a channel."""
103 channel_type: str
104 name: str
105 status: str
106 connected_at: Optional[str] = None
107 last_activity: Optional[str] = None
108 message_count: int = 0
109 error_count: int = 0
110 avg_latency_ms: float = 0.0
111 rate_limit_remaining: Optional[int] = None
112 metadata: Dict[str, Any] = field(default_factory=dict)
114 def to_dict(self) -> Dict[str, Any]:
115 return asdict(self)
118@dataclass
119class QueueStats:
120 """Queue statistics."""
121 total_queues: int = 0
122 total_messages: int = 0
123 pending_messages: int = 0
124 processing_messages: int = 0
125 completed_today: int = 0
126 failed_today: int = 0
127 avg_wait_time_ms: float = 0.0
128 avg_processing_time_ms: float = 0.0
129 throughput_per_second: float = 0.0
130 by_channel: Dict[str, int] = field(default_factory=dict)
131 by_priority: Dict[str, int] = field(default_factory=dict)
133 def to_dict(self) -> Dict[str, Any]:
134 return asdict(self)
137@dataclass
138class ErrorEntry:
139 """Error log entry."""
140 timestamp: str
141 severity: str
142 channel: Optional[str]
143 error_type: str
144 message: str
145 stack_trace: Optional[str] = None
146 context: Dict[str, Any] = field(default_factory=dict)
148 def to_dict(self) -> Dict[str, Any]:
149 return asdict(self)
152@dataclass
153class DashboardConfig:
154 """Configuration for the dashboard."""
155 # Persistence path (should be volume-mounted in Docker)
156 persistence_path: Optional[str] = None
157 # Error log settings
158 max_error_log_size: int = 10000
159 error_log_retention_days: int = 7
160 # Stats settings
161 stats_update_interval_seconds: int = 60
162 stats_history_hours: int = 24
164 def get_persistence_path(self) -> str:
165 """Get persistence path, defaulting to Docker-friendly location."""
166 if self.persistence_path:
167 return self.persistence_path
168 import sys as _sys
169 if os.environ.get('NUNBA_BUNDLED') or getattr(_sys, 'frozen', False):
170 try:
171 from core.platform_paths import get_agent_data_dir
172 _default = os.path.join(get_agent_data_dir(), 'dashboard')
173 except ImportError:
174 _default = os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data', 'dashboard')
175 elif os.path.exists("/app"):
176 _default = "/app/data/dashboard"
177 else:
178 _default = "./agent_data/dashboard"
179 return os.environ.get("DASHBOARD_DATA_PATH", _default)
182class AdminDashboard:
183 """
184 Server-side dashboard data provider.
186 Aggregates statistics from various sources and provides data for
187 admin dashboards. Designed for Docker environments with persistent storage.
189 Usage:
190 dashboard = AdminDashboard()
192 # Get overall stats
193 stats = dashboard.get_stats()
195 # Get active sessions
196 sessions = dashboard.get_active_sessions()
198 # Get channel status
199 channels = dashboard.get_channel_status()
201 # Get recent errors
202 errors = dashboard.get_error_log(limit=50)
203 """
205 def __init__(self, config: Optional[DashboardConfig] = None):
206 self.config = config or DashboardConfig()
207 self._start_time = time.time()
209 # In-memory data stores
210 self._sessions: Dict[str, SessionInfo] = {}
211 self._channels: Dict[str, ChannelStatusInfo] = {}
212 self._error_log: deque = deque(maxlen=self.config.max_error_log_size)
213 self._message_counts: Dict[str, int] = {} # By hour
214 self._response_times: List[float] = []
215 self._stats_history: List[Dict[str, Any]] = []
217 # Counters
218 self._total_messages = 0
219 self._total_errors = 0
220 self._messages_today = 0
221 self._errors_today = 0
222 self._last_day_reset = datetime.now().date()
224 # Ensure persistence directory exists
225 self._ensure_persistence_dir()
227 # Load persisted state
228 self._load_state()
230 def _ensure_persistence_dir(self) -> None:
231 """Ensure persistence directory exists."""
232 path = self.config.get_persistence_path()
233 try:
234 os.makedirs(path, exist_ok=True)
235 except Exception as e:
236 logger.warning(f"Could not create persistence directory {path}: {e}")
238 def _get_state_file(self) -> str:
239 """Get path to state file."""
240 return os.path.join(self.config.get_persistence_path(), "dashboard_state.json")
242 def _get_errors_file(self) -> str:
243 """Get path to errors file."""
244 return os.path.join(self.config.get_persistence_path(), "error_log.json")
246 def _load_state(self) -> None:
247 """Load persisted state."""
248 state_file = self._get_state_file()
249 try:
250 if os.path.exists(state_file):
251 with open(state_file, "r") as f:
252 data = json.load(f)
253 self._total_messages = data.get("total_messages", 0)
254 self._total_errors = data.get("total_errors", 0)
255 self._message_counts = data.get("message_counts", {})
256 logger.info(f"Loaded dashboard state from {state_file}")
257 except Exception as e:
258 logger.warning(f"Could not load dashboard state: {e}")
260 # Load error log
261 errors_file = self._get_errors_file()
262 try:
263 if os.path.exists(errors_file):
264 with open(errors_file, "r") as f:
265 errors = json.load(f)
266 for err in errors[-self.config.max_error_log_size:]:
267 self._error_log.append(ErrorEntry(**err))
268 logger.info(f"Loaded {len(self._error_log)} errors from {errors_file}")
269 except Exception as e:
270 logger.warning(f"Could not load error log: {e}")
272 def _save_state(self) -> None:
273 """Persist state to disk."""
274 state_file = self._get_state_file()
275 try:
276 data = {
277 "total_messages": self._total_messages,
278 "total_errors": self._total_errors,
279 "message_counts": self._message_counts,
280 "saved_at": datetime.now().isoformat(),
281 }
282 with open(state_file, "w") as f:
283 json.dump(data, f, indent=2)
284 except Exception as e:
285 logger.warning(f"Could not save dashboard state: {e}")
287 def _save_errors(self) -> None:
288 """Persist error log to disk."""
289 errors_file = self._get_errors_file()
290 try:
291 errors = [err.to_dict() for err in self._error_log]
292 with open(errors_file, "w") as f:
293 json.dump(errors, f, indent=2)
294 except Exception as e:
295 logger.warning(f"Could not save error log: {e}")
297 def _check_day_reset(self) -> None:
298 """Reset daily counters if day changed."""
299 today = datetime.now().date()
300 if today > self._last_day_reset:
301 self._messages_today = 0
302 self._errors_today = 0
303 self._last_day_reset = today
305 def get_stats(self) -> DashboardStats:
306 """
307 Get aggregated dashboard statistics.
309 Returns:
310 DashboardStats with current system metrics
311 """
312 self._check_day_reset()
313 now = datetime.now()
315 # Calculate messages per minute
316 hour_key = now.strftime("%Y-%m-%d-%H")
317 messages_this_hour = self._message_counts.get(hour_key, 0)
318 minutes_in_hour = now.minute + 1
319 messages_per_minute = messages_this_hour / max(1, minutes_in_hour)
321 # Calculate error rate
322 error_rate = 0.0
323 if self._messages_today > 0:
324 error_rate = (self._errors_today / self._messages_today) * 100
326 # Calculate response time percentiles
327 avg_response = 0.0
328 p99_response = 0.0
329 if self._response_times:
330 avg_response = sum(self._response_times) / len(self._response_times)
331 sorted_times = sorted(self._response_times)
332 p99_idx = int(len(sorted_times) * 0.99)
333 p99_response = sorted_times[min(p99_idx, len(sorted_times) - 1)]
335 # Get memory usage (basic)
336 memory_mb = 0.0
337 try:
338 import psutil
339 process = psutil.Process()
340 memory_mb = process.memory_info().rss / (1024 * 1024)
341 except ImportError:
342 pass
344 return DashboardStats(
345 timestamp=now.isoformat(),
346 uptime_seconds=time.time() - self._start_time,
347 total_messages_processed=self._total_messages,
348 messages_today=self._messages_today,
349 messages_this_hour=messages_this_hour,
350 messages_per_minute=messages_per_minute,
351 active_sessions=len(self._sessions),
352 total_sessions_today=len(self._sessions),
353 active_channels=len([c for c in self._channels.values() if c.status == "connected"]),
354 total_channels=len(self._channels),
355 queue_depth=0, # To be populated by queue integration
356 queue_processing_rate=0.0,
357 avg_response_time_ms=avg_response,
358 p99_response_time_ms=p99_response,
359 error_count_today=self._errors_today,
360 error_rate_percent=error_rate,
361 memory_usage_mb=memory_mb,
362 cpu_usage_percent=0.0,
363 )
365 def get_active_sessions(self) -> List[SessionInfo]:
366 """
367 Get list of active sessions.
369 Returns:
370 List of SessionInfo for all active sessions
371 """
372 return list(self._sessions.values())
374 def get_channel_status(self) -> Dict[str, ChannelStatusInfo]:
375 """
376 Get status of all channels.
378 Returns:
379 Dictionary mapping channel type to ChannelStatusInfo
380 """
381 return self._channels.copy()
383 def get_queue_stats(self) -> QueueStats:
384 """
385 Get queue statistics.
387 Returns:
388 QueueStats with current queue metrics
389 """
390 # This would be populated by integrating with the actual queue system
391 return QueueStats(
392 total_queues=0,
393 total_messages=0,
394 pending_messages=0,
395 processing_messages=0,
396 completed_today=0,
397 failed_today=0,
398 avg_wait_time_ms=0.0,
399 avg_processing_time_ms=0.0,
400 throughput_per_second=0.0,
401 )
403 def get_error_log(self, limit: int = 100) -> List[ErrorEntry]:
404 """
405 Get recent error log entries.
407 Args:
408 limit: Maximum number of entries to return
410 Returns:
411 List of ErrorEntry objects, most recent first
412 """
413 errors = list(self._error_log)
414 errors.reverse() # Most recent first
415 return errors[:limit]
417 # Data recording methods (called by other components)
419 def record_message(self, channel: str) -> None:
420 """Record a message being processed."""
421 self._check_day_reset()
422 self._total_messages += 1
423 self._messages_today += 1
425 # Track by hour
426 hour_key = datetime.now().strftime("%Y-%m-%d-%H")
427 self._message_counts[hour_key] = self._message_counts.get(hour_key, 0) + 1
429 # Clean old hour keys (keep last 48 hours)
430 cutoff = datetime.now() - timedelta(hours=48)
431 cutoff_key = cutoff.strftime("%Y-%m-%d-%H")
432 self._message_counts = {
433 k: v for k, v in self._message_counts.items() if k >= cutoff_key
434 }
436 # Update channel message count
437 if channel in self._channels:
438 self._channels[channel].message_count += 1
439 self._channels[channel].last_activity = datetime.now().isoformat()
441 self._save_state()
443 def record_response_time(self, latency_ms: float) -> None:
444 """Record a response time measurement."""
445 self._response_times.append(latency_ms)
446 # Keep only last 10000 measurements
447 if len(self._response_times) > 10000:
448 self._response_times = self._response_times[-10000:]
450 def record_error(
451 self,
452 error_type: str,
453 message: str,
454 channel: Optional[str] = None,
455 severity: ErrorSeverity = ErrorSeverity.ERROR,
456 stack_trace: Optional[str] = None,
457 context: Optional[Dict[str, Any]] = None,
458 ) -> None:
459 """Record an error."""
460 self._check_day_reset()
461 self._total_errors += 1
462 self._errors_today += 1
464 if channel and channel in self._channels:
465 self._channels[channel].error_count += 1
467 entry = ErrorEntry(
468 timestamp=datetime.now().isoformat(),
469 severity=severity.value,
470 channel=channel,
471 error_type=error_type,
472 message=message,
473 stack_trace=stack_trace,
474 context=context or {},
475 )
476 self._error_log.append(entry)
477 self._save_errors()
478 self._save_state()
480 def register_session(
481 self,
482 session_id: str,
483 channel: str,
484 user_id: str,
485 chat_id: str,
486 metadata: Optional[Dict[str, Any]] = None,
487 ) -> SessionInfo:
488 """Register a new active session."""
489 now = datetime.now().isoformat()
490 session = SessionInfo(
491 session_id=session_id,
492 channel=channel,
493 user_id=user_id,
494 chat_id=chat_id,
495 started_at=now,
496 last_activity=now,
497 metadata=metadata or {},
498 )
499 self._sessions[session_id] = session
500 return session
502 def update_session(self, session_id: str, message_count_delta: int = 1) -> None:
503 """Update session activity."""
504 if session_id in self._sessions:
505 self._sessions[session_id].last_activity = datetime.now().isoformat()
506 self._sessions[session_id].message_count += message_count_delta
508 def unregister_session(self, session_id: str) -> None:
509 """Remove a session."""
510 if session_id in self._sessions:
511 del self._sessions[session_id]
513 def register_channel(
514 self,
515 channel_type: str,
516 name: str,
517 status: ChannelStatus = ChannelStatus.DISCONNECTED,
518 metadata: Optional[Dict[str, Any]] = None,
519 ) -> ChannelStatusInfo:
520 """Register a channel."""
521 channel_info = ChannelStatusInfo(
522 channel_type=channel_type,
523 name=name,
524 status=status.value,
525 metadata=metadata or {},
526 )
527 self._channels[channel_type] = channel_info
528 return channel_info
530 def update_channel_status(
531 self,
532 channel_type: str,
533 status: ChannelStatus,
534 latency_ms: Optional[float] = None,
535 ) -> None:
536 """Update channel status."""
537 if channel_type in self._channels:
538 self._channels[channel_type].status = status.value
539 if status == ChannelStatus.CONNECTED:
540 self._channels[channel_type].connected_at = datetime.now().isoformat()
541 if latency_ms is not None:
542 self._channels[channel_type].avg_latency_ms = latency_ms
545# Singleton instance
546_dashboard: Optional[AdminDashboard] = None
549def get_dashboard(config: Optional[DashboardConfig] = None) -> AdminDashboard:
550 """Get or create the global dashboard instance."""
551 global _dashboard
552 if _dashboard is None:
553 _dashboard = AdminDashboard(config)
554 return _dashboard