Coverage for integrations / channels / admin / dashboard.py: 47.2%

267 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Admin Dashboard - Server-side Dashboard Data Provider 

3 

4Provides aggregated statistics and real-time data for admin dashboards. 

5Designed for Docker environments with persistent stats storage. 

6 

7Features: 

8- Dashboard statistics aggregation 

9- Active session monitoring 

10- Channel status tracking 

11- Queue statistics 

12- Error logging with persistence 

13- Docker volume support for data persistence 

14""" 

15 

16from __future__ import annotations 

17 

18import json 

19import logging 

20import os 

21import time 

22from dataclasses import dataclass, field, asdict 

23from datetime import datetime, timedelta 

24from typing import Any, Dict, List, Optional 

25from collections import deque 

26from enum import Enum 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class ChannelStatus(str, Enum): 

32 """Channel connection status.""" 

33 CONNECTED = "connected" 

34 DISCONNECTED = "disconnected" 

35 CONNECTING = "connecting" 

36 ERROR = "error" 

37 PAUSED = "paused" 

38 RATE_LIMITED = "rate_limited" 

39 

40 

41class ErrorSeverity(str, Enum): 

42 """Error severity levels.""" 

43 DEBUG = "debug" 

44 INFO = "info" 

45 WARNING = "warning" 

46 ERROR = "error" 

47 CRITICAL = "critical" 

48 

49 

50@dataclass 

51class DashboardStats: 

52 """Aggregated dashboard statistics.""" 

53 timestamp: str = "" 

54 uptime_seconds: float = 0.0 

55 # Message stats 

56 total_messages_processed: int = 0 

57 messages_today: int = 0 

58 messages_this_hour: int = 0 

59 messages_per_minute: float = 0.0 

60 # Session stats 

61 active_sessions: int = 0 

62 total_sessions_today: int = 0 

63 # Channel stats 

64 active_channels: int = 0 

65 total_channels: int = 0 

66 # Queue stats 

67 queue_depth: int = 0 

68 queue_processing_rate: float = 0.0 

69 # Performance stats 

70 avg_response_time_ms: float = 0.0 

71 p99_response_time_ms: float = 0.0 

72 # Error stats 

73 error_count_today: int = 0 

74 error_rate_percent: float = 0.0 

75 # Resource stats 

76 memory_usage_mb: float = 0.0 

77 cpu_usage_percent: float = 0.0 

78 

79 def to_dict(self) -> Dict[str, Any]: 

80 return asdict(self) 

81 

82 

83@dataclass 

84class SessionInfo: 

85 """Information about an active session.""" 

86 session_id: str 

87 channel: str 

88 user_id: str 

89 chat_id: str 

90 started_at: str 

91 last_activity: str 

92 message_count: int = 0 

93 state: str = "active" 

94 metadata: Dict[str, Any] = field(default_factory=dict) 

95 

96 def to_dict(self) -> Dict[str, Any]: 

97 return asdict(self) 

98 

99 

100@dataclass 

101class ChannelStatusInfo: 

102 """Status information for a channel.""" 

103 channel_type: str 

104 name: str 

105 status: str 

106 connected_at: Optional[str] = None 

107 last_activity: Optional[str] = None 

108 message_count: int = 0 

109 error_count: int = 0 

110 avg_latency_ms: float = 0.0 

111 rate_limit_remaining: Optional[int] = None 

112 metadata: Dict[str, Any] = field(default_factory=dict) 

113 

114 def to_dict(self) -> Dict[str, Any]: 

115 return asdict(self) 

116 

117 

118@dataclass 

119class QueueStats: 

120 """Queue statistics.""" 

121 total_queues: int = 0 

122 total_messages: int = 0 

123 pending_messages: int = 0 

124 processing_messages: int = 0 

125 completed_today: int = 0 

126 failed_today: int = 0 

127 avg_wait_time_ms: float = 0.0 

128 avg_processing_time_ms: float = 0.0 

129 throughput_per_second: float = 0.0 

130 by_channel: Dict[str, int] = field(default_factory=dict) 

131 by_priority: Dict[str, int] = field(default_factory=dict) 

132 

133 def to_dict(self) -> Dict[str, Any]: 

134 return asdict(self) 

135 

136 

137@dataclass 

138class ErrorEntry: 

139 """Error log entry.""" 

140 timestamp: str 

141 severity: str 

142 channel: Optional[str] 

143 error_type: str 

144 message: str 

145 stack_trace: Optional[str] = None 

146 context: Dict[str, Any] = field(default_factory=dict) 

147 

148 def to_dict(self) -> Dict[str, Any]: 

149 return asdict(self) 

150 

151 

152@dataclass 

153class DashboardConfig: 

154 """Configuration for the dashboard.""" 

155 # Persistence path (should be volume-mounted in Docker) 

156 persistence_path: Optional[str] = None 

157 # Error log settings 

158 max_error_log_size: int = 10000 

159 error_log_retention_days: int = 7 

160 # Stats settings 

161 stats_update_interval_seconds: int = 60 

162 stats_history_hours: int = 24 

163 

164 def get_persistence_path(self) -> str: 

165 """Get persistence path, defaulting to Docker-friendly location.""" 

166 if self.persistence_path: 

167 return self.persistence_path 

168 import sys as _sys 

169 if os.environ.get('NUNBA_BUNDLED') or getattr(_sys, 'frozen', False): 

170 try: 

171 from core.platform_paths import get_agent_data_dir 

172 _default = os.path.join(get_agent_data_dir(), 'dashboard') 

173 except ImportError: 

174 _default = os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data', 'dashboard') 

175 elif os.path.exists("/app"): 

176 _default = "/app/data/dashboard" 

177 else: 

178 _default = "./agent_data/dashboard" 

179 return os.environ.get("DASHBOARD_DATA_PATH", _default) 

180 

181 

182class AdminDashboard: 

183 """ 

184 Server-side dashboard data provider. 

185 

186 Aggregates statistics from various sources and provides data for 

187 admin dashboards. Designed for Docker environments with persistent storage. 

188 

189 Usage: 

190 dashboard = AdminDashboard() 

191 

192 # Get overall stats 

193 stats = dashboard.get_stats() 

194 

195 # Get active sessions 

196 sessions = dashboard.get_active_sessions() 

197 

198 # Get channel status 

199 channels = dashboard.get_channel_status() 

200 

201 # Get recent errors 

202 errors = dashboard.get_error_log(limit=50) 

203 """ 

204 

205 def __init__(self, config: Optional[DashboardConfig] = None): 

206 self.config = config or DashboardConfig() 

207 self._start_time = time.time() 

208 

209 # In-memory data stores 

210 self._sessions: Dict[str, SessionInfo] = {} 

211 self._channels: Dict[str, ChannelStatusInfo] = {} 

212 self._error_log: deque = deque(maxlen=self.config.max_error_log_size) 

213 self._message_counts: Dict[str, int] = {} # By hour 

214 self._response_times: List[float] = [] 

215 self._stats_history: List[Dict[str, Any]] = [] 

216 

217 # Counters 

218 self._total_messages = 0 

219 self._total_errors = 0 

220 self._messages_today = 0 

221 self._errors_today = 0 

222 self._last_day_reset = datetime.now().date() 

223 

224 # Ensure persistence directory exists 

225 self._ensure_persistence_dir() 

226 

227 # Load persisted state 

228 self._load_state() 

229 

230 def _ensure_persistence_dir(self) -> None: 

231 """Ensure persistence directory exists.""" 

232 path = self.config.get_persistence_path() 

233 try: 

234 os.makedirs(path, exist_ok=True) 

235 except Exception as e: 

236 logger.warning(f"Could not create persistence directory {path}: {e}") 

237 

238 def _get_state_file(self) -> str: 

239 """Get path to state file.""" 

240 return os.path.join(self.config.get_persistence_path(), "dashboard_state.json") 

241 

242 def _get_errors_file(self) -> str: 

243 """Get path to errors file.""" 

244 return os.path.join(self.config.get_persistence_path(), "error_log.json") 

245 

246 def _load_state(self) -> None: 

247 """Load persisted state.""" 

248 state_file = self._get_state_file() 

249 try: 

250 if os.path.exists(state_file): 

251 with open(state_file, "r") as f: 

252 data = json.load(f) 

253 self._total_messages = data.get("total_messages", 0) 

254 self._total_errors = data.get("total_errors", 0) 

255 self._message_counts = data.get("message_counts", {}) 

256 logger.info(f"Loaded dashboard state from {state_file}") 

257 except Exception as e: 

258 logger.warning(f"Could not load dashboard state: {e}") 

259 

260 # Load error log 

261 errors_file = self._get_errors_file() 

262 try: 

263 if os.path.exists(errors_file): 

264 with open(errors_file, "r") as f: 

265 errors = json.load(f) 

266 for err in errors[-self.config.max_error_log_size:]: 

267 self._error_log.append(ErrorEntry(**err)) 

268 logger.info(f"Loaded {len(self._error_log)} errors from {errors_file}") 

269 except Exception as e: 

270 logger.warning(f"Could not load error log: {e}") 

271 

272 def _save_state(self) -> None: 

273 """Persist state to disk.""" 

274 state_file = self._get_state_file() 

275 try: 

276 data = { 

277 "total_messages": self._total_messages, 

278 "total_errors": self._total_errors, 

279 "message_counts": self._message_counts, 

280 "saved_at": datetime.now().isoformat(), 

281 } 

282 with open(state_file, "w") as f: 

283 json.dump(data, f, indent=2) 

284 except Exception as e: 

285 logger.warning(f"Could not save dashboard state: {e}") 

286 

287 def _save_errors(self) -> None: 

288 """Persist error log to disk.""" 

289 errors_file = self._get_errors_file() 

290 try: 

291 errors = [err.to_dict() for err in self._error_log] 

292 with open(errors_file, "w") as f: 

293 json.dump(errors, f, indent=2) 

294 except Exception as e: 

295 logger.warning(f"Could not save error log: {e}") 

296 

297 def _check_day_reset(self) -> None: 

298 """Reset daily counters if day changed.""" 

299 today = datetime.now().date() 

300 if today > self._last_day_reset: 

301 self._messages_today = 0 

302 self._errors_today = 0 

303 self._last_day_reset = today 

304 

305 def get_stats(self) -> DashboardStats: 

306 """ 

307 Get aggregated dashboard statistics. 

308 

309 Returns: 

310 DashboardStats with current system metrics 

311 """ 

312 self._check_day_reset() 

313 now = datetime.now() 

314 

315 # Calculate messages per minute 

316 hour_key = now.strftime("%Y-%m-%d-%H") 

317 messages_this_hour = self._message_counts.get(hour_key, 0) 

318 minutes_in_hour = now.minute + 1 

319 messages_per_minute = messages_this_hour / max(1, minutes_in_hour) 

320 

321 # Calculate error rate 

322 error_rate = 0.0 

323 if self._messages_today > 0: 

324 error_rate = (self._errors_today / self._messages_today) * 100 

325 

326 # Calculate response time percentiles 

327 avg_response = 0.0 

328 p99_response = 0.0 

329 if self._response_times: 

330 avg_response = sum(self._response_times) / len(self._response_times) 

331 sorted_times = sorted(self._response_times) 

332 p99_idx = int(len(sorted_times) * 0.99) 

333 p99_response = sorted_times[min(p99_idx, len(sorted_times) - 1)] 

334 

335 # Get memory usage (basic) 

336 memory_mb = 0.0 

337 try: 

338 import psutil 

339 process = psutil.Process() 

340 memory_mb = process.memory_info().rss / (1024 * 1024) 

341 except ImportError: 

342 pass 

343 

344 return DashboardStats( 

345 timestamp=now.isoformat(), 

346 uptime_seconds=time.time() - self._start_time, 

347 total_messages_processed=self._total_messages, 

348 messages_today=self._messages_today, 

349 messages_this_hour=messages_this_hour, 

350 messages_per_minute=messages_per_minute, 

351 active_sessions=len(self._sessions), 

352 total_sessions_today=len(self._sessions), 

353 active_channels=len([c for c in self._channels.values() if c.status == "connected"]), 

354 total_channels=len(self._channels), 

355 queue_depth=0, # To be populated by queue integration 

356 queue_processing_rate=0.0, 

357 avg_response_time_ms=avg_response, 

358 p99_response_time_ms=p99_response, 

359 error_count_today=self._errors_today, 

360 error_rate_percent=error_rate, 

361 memory_usage_mb=memory_mb, 

362 cpu_usage_percent=0.0, 

363 ) 

364 

365 def get_active_sessions(self) -> List[SessionInfo]: 

366 """ 

367 Get list of active sessions. 

368 

369 Returns: 

370 List of SessionInfo for all active sessions 

371 """ 

372 return list(self._sessions.values()) 

373 

374 def get_channel_status(self) -> Dict[str, ChannelStatusInfo]: 

375 """ 

376 Get status of all channels. 

377 

378 Returns: 

379 Dictionary mapping channel type to ChannelStatusInfo 

380 """ 

381 return self._channels.copy() 

382 

383 def get_queue_stats(self) -> QueueStats: 

384 """ 

385 Get queue statistics. 

386 

387 Returns: 

388 QueueStats with current queue metrics 

389 """ 

390 # This would be populated by integrating with the actual queue system 

391 return QueueStats( 

392 total_queues=0, 

393 total_messages=0, 

394 pending_messages=0, 

395 processing_messages=0, 

396 completed_today=0, 

397 failed_today=0, 

398 avg_wait_time_ms=0.0, 

399 avg_processing_time_ms=0.0, 

400 throughput_per_second=0.0, 

401 ) 

402 

403 def get_error_log(self, limit: int = 100) -> List[ErrorEntry]: 

404 """ 

405 Get recent error log entries. 

406 

407 Args: 

408 limit: Maximum number of entries to return 

409 

410 Returns: 

411 List of ErrorEntry objects, most recent first 

412 """ 

413 errors = list(self._error_log) 

414 errors.reverse() # Most recent first 

415 return errors[:limit] 

416 

417 # Data recording methods (called by other components) 

418 

419 def record_message(self, channel: str) -> None: 

420 """Record a message being processed.""" 

421 self._check_day_reset() 

422 self._total_messages += 1 

423 self._messages_today += 1 

424 

425 # Track by hour 

426 hour_key = datetime.now().strftime("%Y-%m-%d-%H") 

427 self._message_counts[hour_key] = self._message_counts.get(hour_key, 0) + 1 

428 

429 # Clean old hour keys (keep last 48 hours) 

430 cutoff = datetime.now() - timedelta(hours=48) 

431 cutoff_key = cutoff.strftime("%Y-%m-%d-%H") 

432 self._message_counts = { 

433 k: v for k, v in self._message_counts.items() if k >= cutoff_key 

434 } 

435 

436 # Update channel message count 

437 if channel in self._channels: 

438 self._channels[channel].message_count += 1 

439 self._channels[channel].last_activity = datetime.now().isoformat() 

440 

441 self._save_state() 

442 

443 def record_response_time(self, latency_ms: float) -> None: 

444 """Record a response time measurement.""" 

445 self._response_times.append(latency_ms) 

446 # Keep only last 10000 measurements 

447 if len(self._response_times) > 10000: 

448 self._response_times = self._response_times[-10000:] 

449 

450 def record_error( 

451 self, 

452 error_type: str, 

453 message: str, 

454 channel: Optional[str] = None, 

455 severity: ErrorSeverity = ErrorSeverity.ERROR, 

456 stack_trace: Optional[str] = None, 

457 context: Optional[Dict[str, Any]] = None, 

458 ) -> None: 

459 """Record an error.""" 

460 self._check_day_reset() 

461 self._total_errors += 1 

462 self._errors_today += 1 

463 

464 if channel and channel in self._channels: 

465 self._channels[channel].error_count += 1 

466 

467 entry = ErrorEntry( 

468 timestamp=datetime.now().isoformat(), 

469 severity=severity.value, 

470 channel=channel, 

471 error_type=error_type, 

472 message=message, 

473 stack_trace=stack_trace, 

474 context=context or {}, 

475 ) 

476 self._error_log.append(entry) 

477 self._save_errors() 

478 self._save_state() 

479 

480 def register_session( 

481 self, 

482 session_id: str, 

483 channel: str, 

484 user_id: str, 

485 chat_id: str, 

486 metadata: Optional[Dict[str, Any]] = None, 

487 ) -> SessionInfo: 

488 """Register a new active session.""" 

489 now = datetime.now().isoformat() 

490 session = SessionInfo( 

491 session_id=session_id, 

492 channel=channel, 

493 user_id=user_id, 

494 chat_id=chat_id, 

495 started_at=now, 

496 last_activity=now, 

497 metadata=metadata or {}, 

498 ) 

499 self._sessions[session_id] = session 

500 return session 

501 

502 def update_session(self, session_id: str, message_count_delta: int = 1) -> None: 

503 """Update session activity.""" 

504 if session_id in self._sessions: 

505 self._sessions[session_id].last_activity = datetime.now().isoformat() 

506 self._sessions[session_id].message_count += message_count_delta 

507 

508 def unregister_session(self, session_id: str) -> None: 

509 """Remove a session.""" 

510 if session_id in self._sessions: 

511 del self._sessions[session_id] 

512 

513 def register_channel( 

514 self, 

515 channel_type: str, 

516 name: str, 

517 status: ChannelStatus = ChannelStatus.DISCONNECTED, 

518 metadata: Optional[Dict[str, Any]] = None, 

519 ) -> ChannelStatusInfo: 

520 """Register a channel.""" 

521 channel_info = ChannelStatusInfo( 

522 channel_type=channel_type, 

523 name=name, 

524 status=status.value, 

525 metadata=metadata or {}, 

526 ) 

527 self._channels[channel_type] = channel_info 

528 return channel_info 

529 

530 def update_channel_status( 

531 self, 

532 channel_type: str, 

533 status: ChannelStatus, 

534 latency_ms: Optional[float] = None, 

535 ) -> None: 

536 """Update channel status.""" 

537 if channel_type in self._channels: 

538 self._channels[channel_type].status = status.value 

539 if status == ChannelStatus.CONNECTED: 

540 self._channels[channel_type].connected_at = datetime.now().isoformat() 

541 if latency_ms is not None: 

542 self._channels[channel_type].avg_latency_ms = latency_ms 

543 

544 

545# Singleton instance 

546_dashboard: Optional[AdminDashboard] = None 

547 

548 

549def get_dashboard(config: Optional[DashboardConfig] = None) -> AdminDashboard: 

550 """Get or create the global dashboard instance.""" 

551 global _dashboard 

552 if _dashboard is None: 

553 _dashboard = AdminDashboard(config) 

554 return _dashboard