Coverage for integrations / channels / admin / metrics.py: 27.7%
278 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Metrics Collector - System Metrics Collection and Export
4Provides metrics collection for monitoring and observability.
5Supports Prometheus export format for container monitoring.
7Features:
8- Message metrics (count, direction, channel)
9- Latency tracking with histograms
10- Error tracking by type
11- Prometheus text format export
12- Container-network compatible endpoints
13- Volume-mounted persistence for metrics history
14"""
16from __future__ import annotations
18import json
19import logging
20import os
21import time
22from collections import defaultdict
23from dataclasses import dataclass, field, asdict
24from datetime import datetime, timedelta
25from typing import Any, Dict, List, Optional, Tuple
26from threading import Lock
28logger = logging.getLogger(__name__)
31@dataclass
32class MetricValue:
33 """A single metric value with metadata."""
34 name: str
35 value: float
36 labels: Dict[str, str] = field(default_factory=dict)
37 timestamp: float = field(default_factory=time.time)
39 def prometheus_format(self) -> str:
40 """Format as Prometheus text format."""
41 if self.labels:
42 labels_str = ",".join(f'{k}="{v}"' for k, v in self.labels.items())
43 return f'{self.name}{{{labels_str}}} {self.value}'
44 return f'{self.name} {self.value}'
47@dataclass
48class HistogramBucket:
49 """Histogram bucket for latency tracking."""
50 le: float # Less than or equal to
51 count: int = 0
54@dataclass
55class Histogram:
56 """Histogram for tracking distributions."""
57 name: str
58 buckets: List[HistogramBucket] = field(default_factory=list)
59 sum_value: float = 0.0
60 count: int = 0
61 labels: Dict[str, str] = field(default_factory=dict)
63 def __post_init__(self):
64 if not self.buckets:
65 # Default latency buckets (in ms)
66 self.buckets = [
67 HistogramBucket(le=5),
68 HistogramBucket(le=10),
69 HistogramBucket(le=25),
70 HistogramBucket(le=50),
71 HistogramBucket(le=100),
72 HistogramBucket(le=250),
73 HistogramBucket(le=500),
74 HistogramBucket(le=1000),
75 HistogramBucket(le=2500),
76 HistogramBucket(le=5000),
77 HistogramBucket(le=float('inf')),
78 ]
80 def observe(self, value: float) -> None:
81 """Record an observation."""
82 self.sum_value += value
83 self.count += 1
84 for bucket in self.buckets:
85 if value <= bucket.le:
86 bucket.count += 1
88 def prometheus_format(self) -> List[str]:
89 """Format as Prometheus text format."""
90 lines = []
91 labels_base = ",".join(f'{k}="{v}"' for k, v in self.labels.items())
93 for bucket in self.buckets:
94 le_str = "+Inf" if bucket.le == float('inf') else str(bucket.le)
95 if labels_base:
96 lines.append(f'{self.name}_bucket{{{labels_base},le="{le_str}"}} {bucket.count}')
97 else:
98 lines.append(f'{self.name}_bucket{{le="{le_str}"}} {bucket.count}')
100 if labels_base:
101 lines.append(f'{self.name}_sum{{{labels_base}}} {self.sum_value}')
102 lines.append(f'{self.name}_count{{{labels_base}}} {self.count}')
103 else:
104 lines.append(f'{self.name}_sum {self.sum_value}')
105 lines.append(f'{self.name}_count {self.count}')
107 return lines
110@dataclass
111class Metrics:
112 """Aggregated metrics container."""
113 timestamp: str = ""
114 period: str = "1h"
115 # Message metrics
116 messages_total: int = 0
117 messages_by_channel: Dict[str, int] = field(default_factory=dict)
118 messages_by_direction: Dict[str, int] = field(default_factory=dict)
119 # Latency metrics
120 latency_avg_ms: float = 0.0
121 latency_p50_ms: float = 0.0
122 latency_p90_ms: float = 0.0
123 latency_p99_ms: float = 0.0
124 latency_max_ms: float = 0.0
125 latency_by_channel: Dict[str, float] = field(default_factory=dict)
126 # Error metrics
127 errors_total: int = 0
128 errors_by_channel: Dict[str, int] = field(default_factory=dict)
129 errors_by_type: Dict[str, int] = field(default_factory=dict)
130 # Rate metrics
131 messages_per_second: float = 0.0
132 errors_per_second: float = 0.0
134 def to_dict(self) -> Dict[str, Any]:
135 return asdict(self)
138@dataclass
139class MetricsConfig:
140 """Configuration for metrics collector."""
141 # Persistence path (should be volume-mounted in Docker)
142 persistence_path: Optional[str] = None
143 # Retention settings
144 retention_hours: int = 24
145 aggregation_interval_seconds: int = 60
146 # Prometheus settings
147 prometheus_prefix: str = "hevolvebot"
148 include_hostname_label: bool = True
150 def get_persistence_path(self) -> str:
151 """Get persistence path, defaulting to Docker-friendly location."""
152 if self.persistence_path:
153 return self.persistence_path
154 import sys as _sys
155 if os.environ.get('NUNBA_BUNDLED') or getattr(_sys, 'frozen', False):
156 try:
157 from core.platform_paths import get_agent_data_dir
158 _default = os.path.join(get_agent_data_dir(), 'metrics')
159 except ImportError:
160 _default = os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data', 'metrics')
161 elif os.path.exists("/app"):
162 _default = "/app/data/metrics"
163 else:
164 _default = "./agent_data/metrics"
165 return os.environ.get("METRICS_DATA_PATH", _default)
168class MetricsCollector:
169 """
170 Collects and exports system metrics.
172 Designed for Docker/container environments with:
173 - Prometheus export format
174 - Container-network compatible addressing
175 - Volume-mounted persistence
177 Usage:
178 collector = MetricsCollector()
180 # Record metrics
181 collector.record_message("telegram", "inbound")
182 collector.record_latency("telegram", 45.2)
183 collector.record_error("telegram", "timeout")
185 # Get aggregated metrics
186 metrics = collector.get_metrics(period="1h")
188 # Export for Prometheus
189 prometheus_text = collector.export_prometheus()
190 """
192 def __init__(self, config: Optional[MetricsConfig] = None):
193 self.config = config or MetricsConfig()
194 self._start_time = time.time()
195 self._lock = Lock()
197 # Counters
198 self._message_counter: Dict[Tuple[str, str], int] = defaultdict(int) # (channel, direction)
199 self._error_counter: Dict[Tuple[str, str], int] = defaultdict(int) # (channel, error_type)
201 # Histograms
202 self._latency_histograms: Dict[str, Histogram] = {}
204 # Time series data for aggregation
205 self._message_times: List[Tuple[float, str, str]] = [] # (timestamp, channel, direction)
206 self._latency_values: List[Tuple[float, str, float]] = [] # (timestamp, channel, latency_ms)
207 self._error_times: List[Tuple[float, str, str]] = [] # (timestamp, channel, error_type)
209 # Hostname for labels
210 self._hostname = os.environ.get("HOSTNAME", os.environ.get("COMPUTERNAME", "unknown"))
212 # Ensure persistence directory exists
213 self._ensure_persistence_dir()
215 # Load persisted state
216 self._load_state()
218 def _ensure_persistence_dir(self) -> None:
219 """Ensure persistence directory exists."""
220 path = self.config.get_persistence_path()
221 try:
222 os.makedirs(path, exist_ok=True)
223 except Exception as e:
224 logger.warning(f"Could not create persistence directory {path}: {e}")
226 def _get_state_file(self) -> str:
227 """Get path to state file."""
228 return os.path.join(self.config.get_persistence_path(), "metrics_state.json")
230 def _load_state(self) -> None:
231 """Load persisted state."""
232 state_file = self._get_state_file()
233 try:
234 if os.path.exists(state_file):
235 with open(state_file, "r") as f:
236 data = json.load(f)
237 # Restore counters
238 for key_str, value in data.get("message_counter", {}).items():
239 parts = key_str.split("|")
240 if len(parts) == 2:
241 self._message_counter[(parts[0], parts[1])] = value
242 for key_str, value in data.get("error_counter", {}).items():
243 parts = key_str.split("|")
244 if len(parts) == 2:
245 self._error_counter[(parts[0], parts[1])] = value
246 logger.info(f"Loaded metrics state from {state_file}")
247 except Exception as e:
248 logger.warning(f"Could not load metrics state: {e}")
250 def _save_state(self) -> None:
251 """Persist state to disk."""
252 state_file = self._get_state_file()
253 try:
254 data = {
255 "message_counter": {
256 f"{k[0]}|{k[1]}": v for k, v in self._message_counter.items()
257 },
258 "error_counter": {
259 f"{k[0]}|{k[1]}": v for k, v in self._error_counter.items()
260 },
261 "saved_at": datetime.now().isoformat(),
262 }
263 with open(state_file, "w") as f:
264 json.dump(data, f, indent=2)
265 except Exception as e:
266 logger.warning(f"Could not save metrics state: {e}")
268 def _cleanup_old_data(self) -> None:
269 """Clean up data older than retention period."""
270 cutoff = time.time() - (self.config.retention_hours * 3600)
272 with self._lock:
273 self._message_times = [(t, c, d) for t, c, d in self._message_times if t > cutoff]
274 self._latency_values = [(t, c, l) for t, c, l in self._latency_values if t > cutoff]
275 self._error_times = [(t, c, e) for t, c, e in self._error_times if t > cutoff]
277 def _get_or_create_histogram(self, channel: str) -> Histogram:
278 """Get or create a latency histogram for a channel."""
279 if channel not in self._latency_histograms:
280 self._latency_histograms[channel] = Histogram(
281 name=f"{self.config.prometheus_prefix}_request_latency_ms",
282 labels={"channel": channel},
283 )
284 return self._latency_histograms[channel]
286 def record_message(self, channel: str, direction: str) -> None:
287 """
288 Record a message being processed.
290 Args:
291 channel: Channel name (e.g., "telegram", "discord")
292 direction: Message direction ("inbound" or "outbound")
293 """
294 now = time.time()
295 with self._lock:
296 self._message_counter[(channel, direction)] += 1
297 self._message_times.append((now, channel, direction))
299 self._save_state()
301 def record_latency(self, channel: str, latency_ms: float) -> None:
302 """
303 Record a latency measurement.
305 Args:
306 channel: Channel name
307 latency_ms: Latency in milliseconds
308 """
309 now = time.time()
310 with self._lock:
311 histogram = self._get_or_create_histogram(channel)
312 histogram.observe(latency_ms)
313 self._latency_values.append((now, channel, latency_ms))
315 def record_error(self, channel: str, error_type: str) -> None:
316 """
317 Record an error.
319 Args:
320 channel: Channel name
321 error_type: Type of error (e.g., "timeout", "rate_limit", "auth")
322 """
323 now = time.time()
324 with self._lock:
325 self._error_counter[(channel, error_type)] += 1
326 self._error_times.append((now, channel, error_type))
328 self._save_state()
330 def _parse_period(self, period: str) -> float:
331 """Parse period string to seconds."""
332 unit = period[-1].lower()
333 try:
334 value = int(period[:-1])
335 except ValueError:
336 return 3600 # Default to 1 hour
338 if unit == 's':
339 return value
340 elif unit == 'm':
341 return value * 60
342 elif unit == 'h':
343 return value * 3600
344 elif unit == 'd':
345 return value * 86400
346 return 3600
348 def get_metrics(self, period: str = "1h") -> Metrics:
349 """
350 Get aggregated metrics for a time period.
352 Args:
353 period: Time period (e.g., "1h", "24h", "7d")
355 Returns:
356 Metrics object with aggregated data
357 """
358 self._cleanup_old_data()
359 period_seconds = self._parse_period(period)
360 cutoff = time.time() - period_seconds
362 with self._lock:
363 # Filter data by period
364 messages_in_period = [(t, c, d) for t, c, d in self._message_times if t > cutoff]
365 latencies_in_period = [(t, c, l) for t, c, l in self._latency_values if t > cutoff]
366 errors_in_period = [(t, c, e) for t, c, e in self._error_times if t > cutoff]
368 # Aggregate message counts
369 messages_by_channel: Dict[str, int] = defaultdict(int)
370 messages_by_direction: Dict[str, int] = defaultdict(int)
371 for _, channel, direction in messages_in_period:
372 messages_by_channel[channel] += 1
373 messages_by_direction[direction] += 1
375 # Aggregate latencies
376 latency_by_channel: Dict[str, List[float]] = defaultdict(list)
377 for _, channel, latency in latencies_in_period:
378 latency_by_channel[channel].append(latency)
380 all_latencies = [l for _, _, l in latencies_in_period]
382 # Calculate latency percentiles
383 latency_avg = 0.0
384 latency_p50 = 0.0
385 latency_p90 = 0.0
386 latency_p99 = 0.0
387 latency_max = 0.0
389 if all_latencies:
390 sorted_latencies = sorted(all_latencies)
391 latency_avg = sum(all_latencies) / len(all_latencies)
392 latency_p50 = sorted_latencies[int(len(sorted_latencies) * 0.5)]
393 latency_p90 = sorted_latencies[int(len(sorted_latencies) * 0.9)]
394 latency_p99 = sorted_latencies[min(int(len(sorted_latencies) * 0.99), len(sorted_latencies) - 1)]
395 latency_max = sorted_latencies[-1]
397 # Average latency by channel
398 avg_latency_by_channel = {
399 channel: sum(latencies) / len(latencies) if latencies else 0.0
400 for channel, latencies in latency_by_channel.items()
401 }
403 # Aggregate errors
404 errors_by_channel: Dict[str, int] = defaultdict(int)
405 errors_by_type: Dict[str, int] = defaultdict(int)
406 for _, channel, error_type in errors_in_period:
407 errors_by_channel[channel] += 1
408 errors_by_type[error_type] += 1
410 # Calculate rates
411 messages_per_second = len(messages_in_period) / period_seconds if period_seconds > 0 else 0.0
412 errors_per_second = len(errors_in_period) / period_seconds if period_seconds > 0 else 0.0
414 return Metrics(
415 timestamp=datetime.now().isoformat(),
416 period=period,
417 messages_total=len(messages_in_period),
418 messages_by_channel=dict(messages_by_channel),
419 messages_by_direction=dict(messages_by_direction),
420 latency_avg_ms=latency_avg,
421 latency_p50_ms=latency_p50,
422 latency_p90_ms=latency_p90,
423 latency_p99_ms=latency_p99,
424 latency_max_ms=latency_max,
425 latency_by_channel=avg_latency_by_channel,
426 errors_total=len(errors_in_period),
427 errors_by_channel=dict(errors_by_channel),
428 errors_by_type=dict(errors_by_type),
429 messages_per_second=messages_per_second,
430 errors_per_second=errors_per_second,
431 )
433 def export_prometheus(self) -> str:
434 """
435 Export metrics in Prometheus text format.
437 Returns:
438 Prometheus-compatible metrics text
439 """
440 lines = []
441 prefix = self.config.prometheus_prefix
443 # Add hostname label if configured
444 base_labels = {}
445 if self.config.include_hostname_label:
446 base_labels["hostname"] = self._hostname
448 # Helper to format labels
449 def format_labels(extra_labels: Dict[str, str]) -> str:
450 all_labels = {**base_labels, **extra_labels}
451 if not all_labels:
452 return ""
453 return "{" + ",".join(f'{k}="{v}"' for k, v in all_labels.items()) + "}"
455 # Uptime
456 uptime = time.time() - self._start_time
457 lines.append(f"# HELP {prefix}_uptime_seconds Time since metrics collector started")
458 lines.append(f"# TYPE {prefix}_uptime_seconds gauge")
459 lines.append(f"{prefix}_uptime_seconds{format_labels({})} {uptime}")
460 lines.append("")
462 # Message counters
463 lines.append(f"# HELP {prefix}_messages_total Total messages processed")
464 lines.append(f"# TYPE {prefix}_messages_total counter")
465 with self._lock:
466 for (channel, direction), count in self._message_counter.items():
467 labels = format_labels({"channel": channel, "direction": direction})
468 lines.append(f"{prefix}_messages_total{labels} {count}")
469 lines.append("")
471 # Error counters
472 lines.append(f"# HELP {prefix}_errors_total Total errors")
473 lines.append(f"# TYPE {prefix}_errors_total counter")
474 with self._lock:
475 for (channel, error_type), count in self._error_counter.items():
476 labels = format_labels({"channel": channel, "error_type": error_type})
477 lines.append(f"{prefix}_errors_total{labels} {count}")
478 lines.append("")
480 # Latency histograms
481 lines.append(f"# HELP {prefix}_request_latency_ms Request latency in milliseconds")
482 lines.append(f"# TYPE {prefix}_request_latency_ms histogram")
483 with self._lock:
484 for channel, histogram in self._latency_histograms.items():
485 for bucket in histogram.buckets:
486 le_str = "+Inf" if bucket.le == float('inf') else str(bucket.le)
487 labels = format_labels({"channel": channel, "le": le_str})
488 lines.append(f"{prefix}_request_latency_ms_bucket{labels} {bucket.count}")
489 sum_labels = format_labels({"channel": channel})
490 lines.append(f"{prefix}_request_latency_ms_sum{sum_labels} {histogram.sum_value}")
491 lines.append(f"{prefix}_request_latency_ms_count{sum_labels} {histogram.count}")
492 lines.append("")
494 return "\n".join(lines)
496 def reset(self) -> None:
497 """Reset all metrics (for testing)."""
498 with self._lock:
499 self._message_counter.clear()
500 self._error_counter.clear()
501 self._latency_histograms.clear()
502 self._message_times.clear()
503 self._latency_values.clear()
504 self._error_times.clear()
506 def get_summary(self) -> Dict[str, Any]:
507 """Get a quick summary of current metrics."""
508 total_messages = sum(self._message_counter.values())
509 total_errors = sum(self._error_counter.values())
510 channels = set(k[0] for k in self._message_counter.keys())
512 return {
513 "total_messages": total_messages,
514 "total_errors": total_errors,
515 "channels_active": len(channels),
516 "uptime_seconds": time.time() - self._start_time,
517 }
520# Singleton instance
521_collector: Optional[MetricsCollector] = None
524def get_metrics_collector(config: Optional[MetricsConfig] = None) -> MetricsCollector:
525 """Get or create the global metrics collector instance."""
526 global _collector
527 if _collector is None:
528 _collector = MetricsCollector(config)
529 return _collector