Coverage for core / superadmin_report.py: 0.0%

125 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""Central superadmin report-in client. 

2 

3When a HARTOS install successfully forms a peer (any topology — flat / 

4regional / central, any distribution form — bundled / Docker / ISO), 

5it MUST report its identity to the canonical superadmin allowlist. 

6This module is the single send path for that report. 

7 

8Wired from ``GossipProtocol.start()``. Offline-tolerant: failed 

9reports persist to an outbox under ``platform_paths.get_data_dir() / 

10superadmin_outbox`` and replay on the next tick. 

11 

12See Nunba memory: 

13- ``project_universal_peer_join_central_report.md`` (the spec) 

14- ``core/superadmins.py`` (allowlist + URLs) 

15""" 

16from __future__ import annotations 

17import json 

18import logging 

19import os 

20import threading 

21import time 

22import uuid 

23from typing import Dict, List, Optional 

24 

25from core.superadmins import ( 

26 SUPERADMIN_CENTRAL_URLS, 

27 SUPERADMIN_FALLBACK_CENTRAL_URLS, 

28 ALL_CENTRAL_URLS, 

29 REPORT_INTERVAL_SEC, 

30 REPORT_TIMEOUT_CONNECT, 

31 REPORT_TIMEOUT_READ, 

32 REPORT_OUTBOX_RETRY_SEC, 

33) 

34 

35logger = logging.getLogger('superadmin_report') 

36 

37# Outbox cap. If centrals stay unreachable indefinitely, the outbox 

38# would otherwise grow without bound (one file per failed central per 

39# REPORT_INTERVAL_SEC). Drop the oldest on overflow. 

40MAX_OUTBOX_FILES = 100 

41 

42 

43# ─── Outbox helpers ─────────────────────────────────────────────────── 

44 

45def _outbox_dir() -> Optional[str]: 

46 """Return the outbox directory path, or None if data dir is 

47 unreachable (degraded environment). Best-effort mkdir.""" 

48 try: 

49 from core.platform_paths import get_data_dir 

50 path = os.path.join(get_data_dir(), 'superadmin_outbox') 

51 os.makedirs(path, exist_ok=True) 

52 return path 

53 except Exception: 

54 return None 

55 

56 

57def _outbox_write(report: Dict) -> None: 

58 path = _outbox_dir() 

59 if not path: 

60 return 

61 # Enforce MAX_OUTBOX_FILES cap before writing — drop the oldest if 

62 # we'd otherwise exceed. Filename prefix is ms-timestamp so 

63 # lexicographic sort = chronological. 

64 try: 

65 existing = sorted( 

66 f for f in os.listdir(path) if f.endswith('.json') 

67 ) 

68 while len(existing) >= MAX_OUTBOX_FILES: 

69 try: 

70 os.remove(os.path.join(path, existing.pop(0))) 

71 except Exception: 

72 break # Stop trying if removal itself fails 

73 except Exception: 

74 pass # Cap is best-effort; better to write than to crash 

75 fname = f"{int(time.time() * 1000)}-{uuid.uuid4().hex[:8]}.json" 

76 try: 

77 with open(os.path.join(path, fname), 'w', encoding='utf-8') as fh: 

78 json.dump(report, fh) 

79 except Exception as e: 

80 logger.debug(f"Outbox write failed: {e}") 

81 

82 

83def _outbox_drain() -> List[str]: 

84 """Return list of outbox file paths, oldest first (filename has ms 

85 timestamp prefix so lexicographic sort = chronological).""" 

86 path = _outbox_dir() 

87 if not path: 

88 return [] 

89 try: 

90 files = sorted( 

91 f for f in os.listdir(path) 

92 if f.endswith('.json') 

93 ) 

94 return [os.path.join(path, f) for f in files] 

95 except Exception: 

96 return [] 

97 

98 

99# ─── HTTP send (one central) ────────────────────────────────────────── 

100 

101def _post_report(central_url: str, report: Dict) -> bool: 

102 """POST a single report to one central. Returns True iff 2xx.""" 

103 try: 

104 from core.http_pool import pooled_post 

105 url = f"{central_url.rstrip('/')}/api/social/peers/report-join" 

106 resp = pooled_post( 

107 url, json=report, 

108 timeout=(REPORT_TIMEOUT_CONNECT, REPORT_TIMEOUT_READ), 

109 ) 

110 if 200 <= resp.status_code < 300: 

111 return True 

112 logger.debug(f"Report-in to {central_url} status={resp.status_code}") 

113 return False 

114 except Exception as e: 

115 logger.debug(f"Report-in to {central_url} failed: {e}") 

116 return False 

117 

118 

119# ─── Public API ─────────────────────────────────────────────────────── 

120 

121def build_report(node_info: Dict) -> Dict: 

122 """Wrap node_info into a report envelope. 

123 

124 Privacy bound: ship identity + topology + bandwidth tier + capability 

125 summary. No personal user data, no chat content, no model 

126 activations. 

127 """ 

128 return { 

129 'schema': 'superadmin-report-v1', 

130 'reported_at': int(time.time()), 

131 'node_id': node_info.get('node_id', ''), 

132 'node_name': node_info.get('name', ''), 

133 'url': node_info.get('url', ''), 

134 'tier': node_info.get('tier', ''), 

135 'capability_tier': node_info.get('capability_tier', ''), 

136 'version': node_info.get('version', ''), 

137 'release_version': node_info.get('release_version', ''), 

138 'bandwidth_profile': node_info.get('bandwidth_profile', ''), 

139 'guardrail_hash': node_info.get('guardrail_hash', ''), 

140 'code_hash': node_info.get('code_hash', ''), 

141 'public_key': node_info.get('public_key', ''), 

142 'hart_tag': node_info.get('hart_tag', ''), 

143 } 

144 

145 

146def report_join(node_info: Dict) -> int: 

147 """Fan out report-in: try primary central(s) first, fall back to 

148 secondary central(s) ONLY if every primary fails. 

149 

150 Returns count of centrals that ack'd (HTTP 2xx). Reports that 

151 fail are persisted to the outbox for retry — keyed to each failed 

152 target so the drain loop retries only the right URL. 

153 

154 Failover order is canonical (`core.superadmins`): 

155 - `SUPERADMIN_CENTRAL_URLS` (primary, e.g. central.hevolve.ai) 

156 - `SUPERADMIN_FALLBACK_CENTRAL_URLS` (secondary, e.g. azurekong.hertzai.com) 

157 

158 Secondary endpoints are NOT tried in parallel — they're a true 

159 failover, so one ack on primary is enough and we don't double-bill 

160 the central with redundant traffic. 

161 """ 

162 report = build_report(node_info) 

163 sent = 0 

164 primary_failures = [] 

165 for central in SUPERADMIN_CENTRAL_URLS: 

166 if _post_report(central, report): 

167 sent += 1 

168 else: 

169 primary_failures.append(central) 

170 if sent > 0: 

171 # Primary path succeeded — outbox the failed primaries (if any) 

172 # so they retry, but DO NOT touch the fallback. 

173 for central in primary_failures: 

174 report_copy = dict(report) 

175 report_copy['_target_central'] = central 

176 _outbox_write(report_copy) 

177 return sent 

178 # Every primary failed → try fallbacks. This is the 

179 # "central.hevolve.ai unreachable, try azurekong.hertzai.com" path. 

180 for central in SUPERADMIN_FALLBACK_CENTRAL_URLS: 

181 if _post_report(central, report): 

182 sent += 1 

183 else: 

184 report_copy = dict(report) 

185 report_copy['_target_central'] = central 

186 _outbox_write(report_copy) 

187 if sent == 0: 

188 # All centrals offline — outbox primaries too so they retry on 

189 # the next tick (fallbacks already outboxed above). 

190 for central in primary_failures: 

191 report_copy = dict(report) 

192 report_copy['_target_central'] = central 

193 _outbox_write(report_copy) 

194 return sent 

195 

196 

197def drain_outbox() -> int: 

198 """Replay any persisted reports. Called on a slow timer. 

199 

200 Returns count successfully resent (deletes those files). 

201 """ 

202 sent = 0 

203 for fpath in _outbox_drain(): 

204 try: 

205 with open(fpath, 'r', encoding='utf-8') as fh: 

206 report = json.load(fh) 

207 target = report.pop('_target_central', None) 

208 # Accept ANY known central (primary or fallback) so the 

209 # drain works for outboxed reports against either tier. 

210 if not target or target not in ALL_CENTRAL_URLS: 

211 # Stale / unknown target — drop it rather than letting 

212 # it accumulate forever. 

213 os.remove(fpath) 

214 continue 

215 if _post_report(target, report): 

216 os.remove(fpath) 

217 sent += 1 

218 except Exception as e: 

219 logger.debug(f"Outbox drain error for {fpath}: {e}") 

220 return sent 

221 

222 

223# ─── Background loop ────────────────────────────────────────────────── 

224 

225_loop_started = False 

226_loop_lock = threading.Lock() 

227 

228 

229def start_background_loop(get_node_info_callable) -> None: 

230 """Start the periodic report + outbox-drain loop. 

231 

232 The caller passes a zero-arg callable that returns the latest 

233 node_info dict (so the loop sees fresh values on every tick — port 

234 changes, hart_tag late-binding, etc.). Idempotent: safe to call 

235 multiple times; only the first call spawns the thread. 

236 """ 

237 global _loop_started 

238 with _loop_lock: 

239 if _loop_started: 

240 return 

241 _loop_started = True 

242 

243 def _loop(): 

244 # First-tick: send immediately, but with a short pre-sleep so 

245 # any in-flight boot work (DB migration, model warmup) has a 

246 # chance to settle and the node_info dict is populated. 

247 time.sleep(15) 

248 last_report = 0.0 

249 

250 def _heartbeat_safe(): 

251 """Best-effort watchdog heartbeat — same pattern as the 

252 other long-cadence daemons (peer_discovery, AutoDiscovery). 

253 Without this, a watchdog with frozen-threshold < REPORT_ 

254 OUTBOX_RETRY_SEC would flag this thread as stalled.""" 

255 try: 

256 from security.node_watchdog import get_watchdog 

257 wd = get_watchdog() 

258 if wd: 

259 wd.heartbeat('superadmin_report') 

260 except Exception: 

261 pass 

262 

263 while True: 

264 _heartbeat_safe() 

265 try: 

266 now = time.time() 

267 if now - last_report >= REPORT_INTERVAL_SEC: 

268 try: 

269 info = get_node_info_callable() or {} 

270 if info.get('node_id'): 

271 n = report_join(info) 

272 if n > 0: 

273 logger.info( 

274 f"Reported to {n}/" 

275 f"{len(SUPERADMIN_CENTRAL_URLS)} " 

276 f"superadmin centrals") 

277 except Exception as e: 

278 logger.debug(f"Periodic report-in error: {e}") 

279 last_report = now 

280 # Always drain the outbox (cheap when empty). 

281 try: 

282 drained = drain_outbox() 

283 if drained: 

284 logger.info(f"Outbox drained {drained} stale reports") 

285 except Exception as e: 

286 logger.debug(f"Outbox drain error: {e}") 

287 except Exception as e: 

288 logger.debug(f"Report loop tick error: {e}") 

289 time.sleep(REPORT_OUTBOX_RETRY_SEC) 

290 

291 t = threading.Thread(target=_loop, name='superadmin-report-in', daemon=True) 

292 t.start() 

293 logger.info("Superadmin report-in loop started " 

294 f"(interval={REPORT_INTERVAL_SEC}s, " 

295 f"outbox-retry={REPORT_OUTBOX_RETRY_SEC}s)") 

296 

297 

298__all__ = [ 

299 'build_report', 

300 'report_join', 

301 'drain_outbox', 

302 'start_background_loop', 

303]