Coverage for integrations / channels / admin / metrics.py: 27.7%

278 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Metrics Collector - System Metrics Collection and Export 

3 

4Provides metrics collection for monitoring and observability. 

5Supports Prometheus export format for container monitoring. 

6 

7Features: 

8- Message metrics (count, direction, channel) 

9- Latency tracking with histograms 

10- Error tracking by type 

11- Prometheus text format export 

12- Container-network compatible endpoints 

13- Volume-mounted persistence for metrics history 

14""" 

15 

16from __future__ import annotations 

17 

18import json 

19import logging 

20import os 

21import time 

22from collections import defaultdict 

23from dataclasses import dataclass, field, asdict 

24from datetime import datetime, timedelta 

25from typing import Any, Dict, List, Optional, Tuple 

26from threading import Lock 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31@dataclass 

32class MetricValue: 

33 """A single metric value with metadata.""" 

34 name: str 

35 value: float 

36 labels: Dict[str, str] = field(default_factory=dict) 

37 timestamp: float = field(default_factory=time.time) 

38 

39 def prometheus_format(self) -> str: 

40 """Format as Prometheus text format.""" 

41 if self.labels: 

42 labels_str = ",".join(f'{k}="{v}"' for k, v in self.labels.items()) 

43 return f'{self.name}{{{labels_str}}} {self.value}' 

44 return f'{self.name} {self.value}' 

45 

46 

47@dataclass 

48class HistogramBucket: 

49 """Histogram bucket for latency tracking.""" 

50 le: float # Less than or equal to 

51 count: int = 0 

52 

53 

54@dataclass 

55class Histogram: 

56 """Histogram for tracking distributions.""" 

57 name: str 

58 buckets: List[HistogramBucket] = field(default_factory=list) 

59 sum_value: float = 0.0 

60 count: int = 0 

61 labels: Dict[str, str] = field(default_factory=dict) 

62 

63 def __post_init__(self): 

64 if not self.buckets: 

65 # Default latency buckets (in ms) 

66 self.buckets = [ 

67 HistogramBucket(le=5), 

68 HistogramBucket(le=10), 

69 HistogramBucket(le=25), 

70 HistogramBucket(le=50), 

71 HistogramBucket(le=100), 

72 HistogramBucket(le=250), 

73 HistogramBucket(le=500), 

74 HistogramBucket(le=1000), 

75 HistogramBucket(le=2500), 

76 HistogramBucket(le=5000), 

77 HistogramBucket(le=float('inf')), 

78 ] 

79 

80 def observe(self, value: float) -> None: 

81 """Record an observation.""" 

82 self.sum_value += value 

83 self.count += 1 

84 for bucket in self.buckets: 

85 if value <= bucket.le: 

86 bucket.count += 1 

87 

88 def prometheus_format(self) -> List[str]: 

89 """Format as Prometheus text format.""" 

90 lines = [] 

91 labels_base = ",".join(f'{k}="{v}"' for k, v in self.labels.items()) 

92 

93 for bucket in self.buckets: 

94 le_str = "+Inf" if bucket.le == float('inf') else str(bucket.le) 

95 if labels_base: 

96 lines.append(f'{self.name}_bucket{{{labels_base},le="{le_str}"}} {bucket.count}') 

97 else: 

98 lines.append(f'{self.name}_bucket{{le="{le_str}"}} {bucket.count}') 

99 

100 if labels_base: 

101 lines.append(f'{self.name}_sum{{{labels_base}}} {self.sum_value}') 

102 lines.append(f'{self.name}_count{{{labels_base}}} {self.count}') 

103 else: 

104 lines.append(f'{self.name}_sum {self.sum_value}') 

105 lines.append(f'{self.name}_count {self.count}') 

106 

107 return lines 

108 

109 

110@dataclass 

111class Metrics: 

112 """Aggregated metrics container.""" 

113 timestamp: str = "" 

114 period: str = "1h" 

115 # Message metrics 

116 messages_total: int = 0 

117 messages_by_channel: Dict[str, int] = field(default_factory=dict) 

118 messages_by_direction: Dict[str, int] = field(default_factory=dict) 

119 # Latency metrics 

120 latency_avg_ms: float = 0.0 

121 latency_p50_ms: float = 0.0 

122 latency_p90_ms: float = 0.0 

123 latency_p99_ms: float = 0.0 

124 latency_max_ms: float = 0.0 

125 latency_by_channel: Dict[str, float] = field(default_factory=dict) 

126 # Error metrics 

127 errors_total: int = 0 

128 errors_by_channel: Dict[str, int] = field(default_factory=dict) 

129 errors_by_type: Dict[str, int] = field(default_factory=dict) 

130 # Rate metrics 

131 messages_per_second: float = 0.0 

132 errors_per_second: float = 0.0 

133 

134 def to_dict(self) -> Dict[str, Any]: 

135 return asdict(self) 

136 

137 

138@dataclass 

139class MetricsConfig: 

140 """Configuration for metrics collector.""" 

141 # Persistence path (should be volume-mounted in Docker) 

142 persistence_path: Optional[str] = None 

143 # Retention settings 

144 retention_hours: int = 24 

145 aggregation_interval_seconds: int = 60 

146 # Prometheus settings 

147 prometheus_prefix: str = "hevolvebot" 

148 include_hostname_label: bool = True 

149 

150 def get_persistence_path(self) -> str: 

151 """Get persistence path, defaulting to Docker-friendly location.""" 

152 if self.persistence_path: 

153 return self.persistence_path 

154 import sys as _sys 

155 if os.environ.get('NUNBA_BUNDLED') or getattr(_sys, 'frozen', False): 

156 try: 

157 from core.platform_paths import get_agent_data_dir 

158 _default = os.path.join(get_agent_data_dir(), 'metrics') 

159 except ImportError: 

160 _default = os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data', 'metrics') 

161 elif os.path.exists("/app"): 

162 _default = "/app/data/metrics" 

163 else: 

164 _default = "./agent_data/metrics" 

165 return os.environ.get("METRICS_DATA_PATH", _default) 

166 

167 

168class MetricsCollector: 

169 """ 

170 Collects and exports system metrics. 

171 

172 Designed for Docker/container environments with: 

173 - Prometheus export format 

174 - Container-network compatible addressing 

175 - Volume-mounted persistence 

176 

177 Usage: 

178 collector = MetricsCollector() 

179 

180 # Record metrics 

181 collector.record_message("telegram", "inbound") 

182 collector.record_latency("telegram", 45.2) 

183 collector.record_error("telegram", "timeout") 

184 

185 # Get aggregated metrics 

186 metrics = collector.get_metrics(period="1h") 

187 

188 # Export for Prometheus 

189 prometheus_text = collector.export_prometheus() 

190 """ 

191 

192 def __init__(self, config: Optional[MetricsConfig] = None): 

193 self.config = config or MetricsConfig() 

194 self._start_time = time.time() 

195 self._lock = Lock() 

196 

197 # Counters 

198 self._message_counter: Dict[Tuple[str, str], int] = defaultdict(int) # (channel, direction) 

199 self._error_counter: Dict[Tuple[str, str], int] = defaultdict(int) # (channel, error_type) 

200 

201 # Histograms 

202 self._latency_histograms: Dict[str, Histogram] = {} 

203 

204 # Time series data for aggregation 

205 self._message_times: List[Tuple[float, str, str]] = [] # (timestamp, channel, direction) 

206 self._latency_values: List[Tuple[float, str, float]] = [] # (timestamp, channel, latency_ms) 

207 self._error_times: List[Tuple[float, str, str]] = [] # (timestamp, channel, error_type) 

208 

209 # Hostname for labels 

210 self._hostname = os.environ.get("HOSTNAME", os.environ.get("COMPUTERNAME", "unknown")) 

211 

212 # Ensure persistence directory exists 

213 self._ensure_persistence_dir() 

214 

215 # Load persisted state 

216 self._load_state() 

217 

218 def _ensure_persistence_dir(self) -> None: 

219 """Ensure persistence directory exists.""" 

220 path = self.config.get_persistence_path() 

221 try: 

222 os.makedirs(path, exist_ok=True) 

223 except Exception as e: 

224 logger.warning(f"Could not create persistence directory {path}: {e}") 

225 

226 def _get_state_file(self) -> str: 

227 """Get path to state file.""" 

228 return os.path.join(self.config.get_persistence_path(), "metrics_state.json") 

229 

230 def _load_state(self) -> None: 

231 """Load persisted state.""" 

232 state_file = self._get_state_file() 

233 try: 

234 if os.path.exists(state_file): 

235 with open(state_file, "r") as f: 

236 data = json.load(f) 

237 # Restore counters 

238 for key_str, value in data.get("message_counter", {}).items(): 

239 parts = key_str.split("|") 

240 if len(parts) == 2: 

241 self._message_counter[(parts[0], parts[1])] = value 

242 for key_str, value in data.get("error_counter", {}).items(): 

243 parts = key_str.split("|") 

244 if len(parts) == 2: 

245 self._error_counter[(parts[0], parts[1])] = value 

246 logger.info(f"Loaded metrics state from {state_file}") 

247 except Exception as e: 

248 logger.warning(f"Could not load metrics state: {e}") 

249 

250 def _save_state(self) -> None: 

251 """Persist state to disk.""" 

252 state_file = self._get_state_file() 

253 try: 

254 data = { 

255 "message_counter": { 

256 f"{k[0]}|{k[1]}": v for k, v in self._message_counter.items() 

257 }, 

258 "error_counter": { 

259 f"{k[0]}|{k[1]}": v for k, v in self._error_counter.items() 

260 }, 

261 "saved_at": datetime.now().isoformat(), 

262 } 

263 with open(state_file, "w") as f: 

264 json.dump(data, f, indent=2) 

265 except Exception as e: 

266 logger.warning(f"Could not save metrics state: {e}") 

267 

268 def _cleanup_old_data(self) -> None: 

269 """Clean up data older than retention period.""" 

270 cutoff = time.time() - (self.config.retention_hours * 3600) 

271 

272 with self._lock: 

273 self._message_times = [(t, c, d) for t, c, d in self._message_times if t > cutoff] 

274 self._latency_values = [(t, c, l) for t, c, l in self._latency_values if t > cutoff] 

275 self._error_times = [(t, c, e) for t, c, e in self._error_times if t > cutoff] 

276 

277 def _get_or_create_histogram(self, channel: str) -> Histogram: 

278 """Get or create a latency histogram for a channel.""" 

279 if channel not in self._latency_histograms: 

280 self._latency_histograms[channel] = Histogram( 

281 name=f"{self.config.prometheus_prefix}_request_latency_ms", 

282 labels={"channel": channel}, 

283 ) 

284 return self._latency_histograms[channel] 

285 

286 def record_message(self, channel: str, direction: str) -> None: 

287 """ 

288 Record a message being processed. 

289 

290 Args: 

291 channel: Channel name (e.g., "telegram", "discord") 

292 direction: Message direction ("inbound" or "outbound") 

293 """ 

294 now = time.time() 

295 with self._lock: 

296 self._message_counter[(channel, direction)] += 1 

297 self._message_times.append((now, channel, direction)) 

298 

299 self._save_state() 

300 

301 def record_latency(self, channel: str, latency_ms: float) -> None: 

302 """ 

303 Record a latency measurement. 

304 

305 Args: 

306 channel: Channel name 

307 latency_ms: Latency in milliseconds 

308 """ 

309 now = time.time() 

310 with self._lock: 

311 histogram = self._get_or_create_histogram(channel) 

312 histogram.observe(latency_ms) 

313 self._latency_values.append((now, channel, latency_ms)) 

314 

315 def record_error(self, channel: str, error_type: str) -> None: 

316 """ 

317 Record an error. 

318 

319 Args: 

320 channel: Channel name 

321 error_type: Type of error (e.g., "timeout", "rate_limit", "auth") 

322 """ 

323 now = time.time() 

324 with self._lock: 

325 self._error_counter[(channel, error_type)] += 1 

326 self._error_times.append((now, channel, error_type)) 

327 

328 self._save_state() 

329 

330 def _parse_period(self, period: str) -> float: 

331 """Parse period string to seconds.""" 

332 unit = period[-1].lower() 

333 try: 

334 value = int(period[:-1]) 

335 except ValueError: 

336 return 3600 # Default to 1 hour 

337 

338 if unit == 's': 

339 return value 

340 elif unit == 'm': 

341 return value * 60 

342 elif unit == 'h': 

343 return value * 3600 

344 elif unit == 'd': 

345 return value * 86400 

346 return 3600 

347 

348 def get_metrics(self, period: str = "1h") -> Metrics: 

349 """ 

350 Get aggregated metrics for a time period. 

351 

352 Args: 

353 period: Time period (e.g., "1h", "24h", "7d") 

354 

355 Returns: 

356 Metrics object with aggregated data 

357 """ 

358 self._cleanup_old_data() 

359 period_seconds = self._parse_period(period) 

360 cutoff = time.time() - period_seconds 

361 

362 with self._lock: 

363 # Filter data by period 

364 messages_in_period = [(t, c, d) for t, c, d in self._message_times if t > cutoff] 

365 latencies_in_period = [(t, c, l) for t, c, l in self._latency_values if t > cutoff] 

366 errors_in_period = [(t, c, e) for t, c, e in self._error_times if t > cutoff] 

367 

368 # Aggregate message counts 

369 messages_by_channel: Dict[str, int] = defaultdict(int) 

370 messages_by_direction: Dict[str, int] = defaultdict(int) 

371 for _, channel, direction in messages_in_period: 

372 messages_by_channel[channel] += 1 

373 messages_by_direction[direction] += 1 

374 

375 # Aggregate latencies 

376 latency_by_channel: Dict[str, List[float]] = defaultdict(list) 

377 for _, channel, latency in latencies_in_period: 

378 latency_by_channel[channel].append(latency) 

379 

380 all_latencies = [l for _, _, l in latencies_in_period] 

381 

382 # Calculate latency percentiles 

383 latency_avg = 0.0 

384 latency_p50 = 0.0 

385 latency_p90 = 0.0 

386 latency_p99 = 0.0 

387 latency_max = 0.0 

388 

389 if all_latencies: 

390 sorted_latencies = sorted(all_latencies) 

391 latency_avg = sum(all_latencies) / len(all_latencies) 

392 latency_p50 = sorted_latencies[int(len(sorted_latencies) * 0.5)] 

393 latency_p90 = sorted_latencies[int(len(sorted_latencies) * 0.9)] 

394 latency_p99 = sorted_latencies[min(int(len(sorted_latencies) * 0.99), len(sorted_latencies) - 1)] 

395 latency_max = sorted_latencies[-1] 

396 

397 # Average latency by channel 

398 avg_latency_by_channel = { 

399 channel: sum(latencies) / len(latencies) if latencies else 0.0 

400 for channel, latencies in latency_by_channel.items() 

401 } 

402 

403 # Aggregate errors 

404 errors_by_channel: Dict[str, int] = defaultdict(int) 

405 errors_by_type: Dict[str, int] = defaultdict(int) 

406 for _, channel, error_type in errors_in_period: 

407 errors_by_channel[channel] += 1 

408 errors_by_type[error_type] += 1 

409 

410 # Calculate rates 

411 messages_per_second = len(messages_in_period) / period_seconds if period_seconds > 0 else 0.0 

412 errors_per_second = len(errors_in_period) / period_seconds if period_seconds > 0 else 0.0 

413 

414 return Metrics( 

415 timestamp=datetime.now().isoformat(), 

416 period=period, 

417 messages_total=len(messages_in_period), 

418 messages_by_channel=dict(messages_by_channel), 

419 messages_by_direction=dict(messages_by_direction), 

420 latency_avg_ms=latency_avg, 

421 latency_p50_ms=latency_p50, 

422 latency_p90_ms=latency_p90, 

423 latency_p99_ms=latency_p99, 

424 latency_max_ms=latency_max, 

425 latency_by_channel=avg_latency_by_channel, 

426 errors_total=len(errors_in_period), 

427 errors_by_channel=dict(errors_by_channel), 

428 errors_by_type=dict(errors_by_type), 

429 messages_per_second=messages_per_second, 

430 errors_per_second=errors_per_second, 

431 ) 

432 

433 def export_prometheus(self) -> str: 

434 """ 

435 Export metrics in Prometheus text format. 

436 

437 Returns: 

438 Prometheus-compatible metrics text 

439 """ 

440 lines = [] 

441 prefix = self.config.prometheus_prefix 

442 

443 # Add hostname label if configured 

444 base_labels = {} 

445 if self.config.include_hostname_label: 

446 base_labels["hostname"] = self._hostname 

447 

448 # Helper to format labels 

449 def format_labels(extra_labels: Dict[str, str]) -> str: 

450 all_labels = {**base_labels, **extra_labels} 

451 if not all_labels: 

452 return "" 

453 return "{" + ",".join(f'{k}="{v}"' for k, v in all_labels.items()) + "}" 

454 

455 # Uptime 

456 uptime = time.time() - self._start_time 

457 lines.append(f"# HELP {prefix}_uptime_seconds Time since metrics collector started") 

458 lines.append(f"# TYPE {prefix}_uptime_seconds gauge") 

459 lines.append(f"{prefix}_uptime_seconds{format_labels({})} {uptime}") 

460 lines.append("") 

461 

462 # Message counters 

463 lines.append(f"# HELP {prefix}_messages_total Total messages processed") 

464 lines.append(f"# TYPE {prefix}_messages_total counter") 

465 with self._lock: 

466 for (channel, direction), count in self._message_counter.items(): 

467 labels = format_labels({"channel": channel, "direction": direction}) 

468 lines.append(f"{prefix}_messages_total{labels} {count}") 

469 lines.append("") 

470 

471 # Error counters 

472 lines.append(f"# HELP {prefix}_errors_total Total errors") 

473 lines.append(f"# TYPE {prefix}_errors_total counter") 

474 with self._lock: 

475 for (channel, error_type), count in self._error_counter.items(): 

476 labels = format_labels({"channel": channel, "error_type": error_type}) 

477 lines.append(f"{prefix}_errors_total{labels} {count}") 

478 lines.append("") 

479 

480 # Latency histograms 

481 lines.append(f"# HELP {prefix}_request_latency_ms Request latency in milliseconds") 

482 lines.append(f"# TYPE {prefix}_request_latency_ms histogram") 

483 with self._lock: 

484 for channel, histogram in self._latency_histograms.items(): 

485 for bucket in histogram.buckets: 

486 le_str = "+Inf" if bucket.le == float('inf') else str(bucket.le) 

487 labels = format_labels({"channel": channel, "le": le_str}) 

488 lines.append(f"{prefix}_request_latency_ms_bucket{labels} {bucket.count}") 

489 sum_labels = format_labels({"channel": channel}) 

490 lines.append(f"{prefix}_request_latency_ms_sum{sum_labels} {histogram.sum_value}") 

491 lines.append(f"{prefix}_request_latency_ms_count{sum_labels} {histogram.count}") 

492 lines.append("") 

493 

494 return "\n".join(lines) 

495 

496 def reset(self) -> None: 

497 """Reset all metrics (for testing).""" 

498 with self._lock: 

499 self._message_counter.clear() 

500 self._error_counter.clear() 

501 self._latency_histograms.clear() 

502 self._message_times.clear() 

503 self._latency_values.clear() 

504 self._error_times.clear() 

505 

506 def get_summary(self) -> Dict[str, Any]: 

507 """Get a quick summary of current metrics.""" 

508 total_messages = sum(self._message_counter.values()) 

509 total_errors = sum(self._error_counter.values()) 

510 channels = set(k[0] for k in self._message_counter.keys()) 

511 

512 return { 

513 "total_messages": total_messages, 

514 "total_errors": total_errors, 

515 "channels_active": len(channels), 

516 "uptime_seconds": time.time() - self._start_time, 

517 } 

518 

519 

520# Singleton instance 

521_collector: Optional[MetricsCollector] = None 

522 

523 

524def get_metrics_collector(config: Optional[MetricsConfig] = None) -> MetricsCollector: 

525 """Get or create the global metrics collector instance.""" 

526 global _collector 

527 if _collector is None: 

528 _collector = MetricsCollector(config) 

529 return _collector