Coverage for core / superadmin_report.py: 0.0%
125 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""Central superadmin report-in client.
3When a HARTOS install successfully forms a peer (any topology — flat /
4regional / central, any distribution form — bundled / Docker / ISO),
5it MUST report its identity to the canonical superadmin allowlist.
6This module is the single send path for that report.
8Wired from ``GossipProtocol.start()``. Offline-tolerant: failed
9reports persist to an outbox under ``platform_paths.get_data_dir() /
10superadmin_outbox`` and replay on the next tick.
12See Nunba memory:
13- ``project_universal_peer_join_central_report.md`` (the spec)
14- ``core/superadmins.py`` (allowlist + URLs)
15"""
16from __future__ import annotations
17import json
18import logging
19import os
20import threading
21import time
22import uuid
23from typing import Dict, List, Optional
25from core.superadmins import (
26 SUPERADMIN_CENTRAL_URLS,
27 SUPERADMIN_FALLBACK_CENTRAL_URLS,
28 ALL_CENTRAL_URLS,
29 REPORT_INTERVAL_SEC,
30 REPORT_TIMEOUT_CONNECT,
31 REPORT_TIMEOUT_READ,
32 REPORT_OUTBOX_RETRY_SEC,
33)
35logger = logging.getLogger('superadmin_report')
37# Outbox cap. If centrals stay unreachable indefinitely, the outbox
38# would otherwise grow without bound (one file per failed central per
39# REPORT_INTERVAL_SEC). Drop the oldest on overflow.
40MAX_OUTBOX_FILES = 100
43# ─── Outbox helpers ───────────────────────────────────────────────────
45def _outbox_dir() -> Optional[str]:
46 """Return the outbox directory path, or None if data dir is
47 unreachable (degraded environment). Best-effort mkdir."""
48 try:
49 from core.platform_paths import get_data_dir
50 path = os.path.join(get_data_dir(), 'superadmin_outbox')
51 os.makedirs(path, exist_ok=True)
52 return path
53 except Exception:
54 return None
57def _outbox_write(report: Dict) -> None:
58 path = _outbox_dir()
59 if not path:
60 return
61 # Enforce MAX_OUTBOX_FILES cap before writing — drop the oldest if
62 # we'd otherwise exceed. Filename prefix is ms-timestamp so
63 # lexicographic sort = chronological.
64 try:
65 existing = sorted(
66 f for f in os.listdir(path) if f.endswith('.json')
67 )
68 while len(existing) >= MAX_OUTBOX_FILES:
69 try:
70 os.remove(os.path.join(path, existing.pop(0)))
71 except Exception:
72 break # Stop trying if removal itself fails
73 except Exception:
74 pass # Cap is best-effort; better to write than to crash
75 fname = f"{int(time.time() * 1000)}-{uuid.uuid4().hex[:8]}.json"
76 try:
77 with open(os.path.join(path, fname), 'w', encoding='utf-8') as fh:
78 json.dump(report, fh)
79 except Exception as e:
80 logger.debug(f"Outbox write failed: {e}")
83def _outbox_drain() -> List[str]:
84 """Return list of outbox file paths, oldest first (filename has ms
85 timestamp prefix so lexicographic sort = chronological)."""
86 path = _outbox_dir()
87 if not path:
88 return []
89 try:
90 files = sorted(
91 f for f in os.listdir(path)
92 if f.endswith('.json')
93 )
94 return [os.path.join(path, f) for f in files]
95 except Exception:
96 return []
99# ─── HTTP send (one central) ──────────────────────────────────────────
101def _post_report(central_url: str, report: Dict) -> bool:
102 """POST a single report to one central. Returns True iff 2xx."""
103 try:
104 from core.http_pool import pooled_post
105 url = f"{central_url.rstrip('/')}/api/social/peers/report-join"
106 resp = pooled_post(
107 url, json=report,
108 timeout=(REPORT_TIMEOUT_CONNECT, REPORT_TIMEOUT_READ),
109 )
110 if 200 <= resp.status_code < 300:
111 return True
112 logger.debug(f"Report-in to {central_url} status={resp.status_code}")
113 return False
114 except Exception as e:
115 logger.debug(f"Report-in to {central_url} failed: {e}")
116 return False
119# ─── Public API ───────────────────────────────────────────────────────
121def build_report(node_info: Dict) -> Dict:
122 """Wrap node_info into a report envelope.
124 Privacy bound: ship identity + topology + bandwidth tier + capability
125 summary. No personal user data, no chat content, no model
126 activations.
127 """
128 return {
129 'schema': 'superadmin-report-v1',
130 'reported_at': int(time.time()),
131 'node_id': node_info.get('node_id', ''),
132 'node_name': node_info.get('name', ''),
133 'url': node_info.get('url', ''),
134 'tier': node_info.get('tier', ''),
135 'capability_tier': node_info.get('capability_tier', ''),
136 'version': node_info.get('version', ''),
137 'release_version': node_info.get('release_version', ''),
138 'bandwidth_profile': node_info.get('bandwidth_profile', ''),
139 'guardrail_hash': node_info.get('guardrail_hash', ''),
140 'code_hash': node_info.get('code_hash', ''),
141 'public_key': node_info.get('public_key', ''),
142 'hart_tag': node_info.get('hart_tag', ''),
143 }
146def report_join(node_info: Dict) -> int:
147 """Fan out report-in: try primary central(s) first, fall back to
148 secondary central(s) ONLY if every primary fails.
150 Returns count of centrals that ack'd (HTTP 2xx). Reports that
151 fail are persisted to the outbox for retry — keyed to each failed
152 target so the drain loop retries only the right URL.
154 Failover order is canonical (`core.superadmins`):
155 - `SUPERADMIN_CENTRAL_URLS` (primary, e.g. central.hevolve.ai)
156 - `SUPERADMIN_FALLBACK_CENTRAL_URLS` (secondary, e.g. azurekong.hertzai.com)
158 Secondary endpoints are NOT tried in parallel — they're a true
159 failover, so one ack on primary is enough and we don't double-bill
160 the central with redundant traffic.
161 """
162 report = build_report(node_info)
163 sent = 0
164 primary_failures = []
165 for central in SUPERADMIN_CENTRAL_URLS:
166 if _post_report(central, report):
167 sent += 1
168 else:
169 primary_failures.append(central)
170 if sent > 0:
171 # Primary path succeeded — outbox the failed primaries (if any)
172 # so they retry, but DO NOT touch the fallback.
173 for central in primary_failures:
174 report_copy = dict(report)
175 report_copy['_target_central'] = central
176 _outbox_write(report_copy)
177 return sent
178 # Every primary failed → try fallbacks. This is the
179 # "central.hevolve.ai unreachable, try azurekong.hertzai.com" path.
180 for central in SUPERADMIN_FALLBACK_CENTRAL_URLS:
181 if _post_report(central, report):
182 sent += 1
183 else:
184 report_copy = dict(report)
185 report_copy['_target_central'] = central
186 _outbox_write(report_copy)
187 if sent == 0:
188 # All centrals offline — outbox primaries too so they retry on
189 # the next tick (fallbacks already outboxed above).
190 for central in primary_failures:
191 report_copy = dict(report)
192 report_copy['_target_central'] = central
193 _outbox_write(report_copy)
194 return sent
197def drain_outbox() -> int:
198 """Replay any persisted reports. Called on a slow timer.
200 Returns count successfully resent (deletes those files).
201 """
202 sent = 0
203 for fpath in _outbox_drain():
204 try:
205 with open(fpath, 'r', encoding='utf-8') as fh:
206 report = json.load(fh)
207 target = report.pop('_target_central', None)
208 # Accept ANY known central (primary or fallback) so the
209 # drain works for outboxed reports against either tier.
210 if not target or target not in ALL_CENTRAL_URLS:
211 # Stale / unknown target — drop it rather than letting
212 # it accumulate forever.
213 os.remove(fpath)
214 continue
215 if _post_report(target, report):
216 os.remove(fpath)
217 sent += 1
218 except Exception as e:
219 logger.debug(f"Outbox drain error for {fpath}: {e}")
220 return sent
223# ─── Background loop ──────────────────────────────────────────────────
225_loop_started = False
226_loop_lock = threading.Lock()
229def start_background_loop(get_node_info_callable) -> None:
230 """Start the periodic report + outbox-drain loop.
232 The caller passes a zero-arg callable that returns the latest
233 node_info dict (so the loop sees fresh values on every tick — port
234 changes, hart_tag late-binding, etc.). Idempotent: safe to call
235 multiple times; only the first call spawns the thread.
236 """
237 global _loop_started
238 with _loop_lock:
239 if _loop_started:
240 return
241 _loop_started = True
243 def _loop():
244 # First-tick: send immediately, but with a short pre-sleep so
245 # any in-flight boot work (DB migration, model warmup) has a
246 # chance to settle and the node_info dict is populated.
247 time.sleep(15)
248 last_report = 0.0
250 def _heartbeat_safe():
251 """Best-effort watchdog heartbeat — same pattern as the
252 other long-cadence daemons (peer_discovery, AutoDiscovery).
253 Without this, a watchdog with frozen-threshold < REPORT_
254 OUTBOX_RETRY_SEC would flag this thread as stalled."""
255 try:
256 from security.node_watchdog import get_watchdog
257 wd = get_watchdog()
258 if wd:
259 wd.heartbeat('superadmin_report')
260 except Exception:
261 pass
263 while True:
264 _heartbeat_safe()
265 try:
266 now = time.time()
267 if now - last_report >= REPORT_INTERVAL_SEC:
268 try:
269 info = get_node_info_callable() or {}
270 if info.get('node_id'):
271 n = report_join(info)
272 if n > 0:
273 logger.info(
274 f"Reported to {n}/"
275 f"{len(SUPERADMIN_CENTRAL_URLS)} "
276 f"superadmin centrals")
277 except Exception as e:
278 logger.debug(f"Periodic report-in error: {e}")
279 last_report = now
280 # Always drain the outbox (cheap when empty).
281 try:
282 drained = drain_outbox()
283 if drained:
284 logger.info(f"Outbox drained {drained} stale reports")
285 except Exception as e:
286 logger.debug(f"Outbox drain error: {e}")
287 except Exception as e:
288 logger.debug(f"Report loop tick error: {e}")
289 time.sleep(REPORT_OUTBOX_RETRY_SEC)
291 t = threading.Thread(target=_loop, name='superadmin-report-in', daemon=True)
292 t.start()
293 logger.info("Superadmin report-in loop started "
294 f"(interval={REPORT_INTERVAL_SEC}s, "
295 f"outbox-retry={REPORT_OUTBOX_RETRY_SEC}s)")
298__all__ = [
299 'build_report',
300 'report_join',
301 'drain_outbox',
302 'start_background_loop',
303]