Coverage for core / recipe_sync.py: 87.2%

156 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2core.recipe_sync - cloud push/pull of recipe-file bundles. 

3 

4Closes the cross-device gap user hit on 2026-05-04 (Speech Therapy 

5clicked from Recents -> recipe file missing locally -> silent 

6fallback to local_assistant). Existing /createpromptlist syncs 

7only metadata (name, prompt, image_url); recipe BLOBS ({id}.json 

8flows + actions, {id}_*_recipe.json distilled steps, 

9{id}_personality.json) were local-only - so an agent created on 

10device A never reached device B. 

11 

12This module pushes the full recipe bundle (ALL {id}*.json files in 

13PROMPTS_DIR) to a central server when an agent is created or its 

14recipe is updated, and pulls them back when the user opens an 

15agent whose recipe is missing locally. 

16 

17Wire format (single source of truth - both push and pull use the 

18same envelope): 

19 

20 POST {CENTRAL_DB_URL}/prompts/sync 

21 { 

22 "schema_version": 1, 

23 "prompt_id": "<int or str>", 

24 "user_id": "<owner>", 

25 "files": {"<filename>": "<file contents as string>", ...}, 

26 "checksum": "<sha256 of canonical files-dict>", 

27 "uploaded_at": <unix epoch> 

28 } 

29 -> 200 OK { "stored": true, "checksum": "..." } 

30 

31 GET {CENTRAL_DB_URL}/prompts/sync/{prompt_id}?user_id=... 

32 -> 200 OK { same envelope as above } 

33 -> 404 { "error": "not_found" } 

34 

35The cloud endpoint MAY be served by HARTOS itself (for centrally- 

36deployed instances) or by an external sync service. The HARTOS 

37side ships endpoints in hart_intelligence_entry.py:/prompts/sync. 

38 

39Best-effort: every push/pull is wrapped in try/except so a sync 

40failure (offline, central down, schema mismatch) never blocks 

41the user-facing flow. All failures log at debug; only the rare 

42schema-version mismatch warns. 

43""" 

44 

45import hashlib 

46import json 

47import logging 

48import os 

49import re 

50import time 

51from typing import Dict, List, Optional 

52 

53logger = logging.getLogger('hevolve.recipe_sync') 

54 

55#: Bump when the wire envelope changes shape. Cloud endpoints 

56#: REJECT requests with schema_version > known_max, so older clients 

57#: see a graceful 4xx instead of silently corrupting cloud state. 

58SCHEMA_VERSION: int = 1 

59 

60#: Per-prompt last-pushed checksum cache file - skips redundant cloud 

61#: writes during multi-flow CREATE (each flow triggers a push; if the 

62#: bundle didn't change we can skip the network roundtrip). See M7 

63#: in the post-shipment idempotency review. 

64_PUSH_CACHE_FILE: str = os.path.join( 

65 os.path.expanduser('~'), '.hevolve', 'recipe_sync_state.json') 

66 

67#: Reserved Windows device names that os.open() silently aliases to 

68#: actual hardware - any incoming filename matching these (with or 

69#: without extension) must be rejected. Mirror Windows' own 

70#: reserved-name list per MS-DOS Device Naming Convention docs. 

71_WINDOWS_RESERVED_NAMES = frozenset({ 

72 'CON', 'PRN', 'AUX', 'NUL', 

73 'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9', 

74 'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9', 

75}) 

76 

77 

78def _safe_filename(fname: str) -> bool: 

79 """Validate a filename from an untrusted cloud payload. 

80 

81 Rejects: 

82 - empty / dotfile / current-or-parent dir 

83 - any path separator (forward or back) - keeps writes inside the 

84 target dir even when os.path.join would otherwise strip it 

85 - drive-letter prefix (Windows: ``C:foo.json``) - os.path.join 

86 SILENTLY ignores the dir prefix when joined with a drive-relative 

87 path, which lets attackers anchor writes anywhere on the drive 

88 - NUL byte - Python open() raises ValueError but we want a clean 

89 WARNING instead of a stack trace 

90 - Windows reserved device names (CON, NUL, COM1, ...) - opens 

91 succeed and alias to actual hardware 

92 - basename mismatch - os.path.basename(fname) != fname catches 

93 any composite path that snuck past the simple separator check 

94 

95 Returns True when safe to write under prompts_dir, False otherwise. 

96 Caller logs WARNING on False. 

97 """ 

98 if not fname or fname in {'.', '..'}: 

99 return False 

100 if fname.startswith('.'): 

101 return False 

102 if '/' in fname or '\\' in fname: 

103 return False 

104 if '\x00' in fname: 

105 return False 

106 # Drive letter detection: a single letter followed by ':' anywhere 

107 # in the name is suspicious on Windows. os.path.basename only 

108 # strips trailing path components, NOT drive-relative anchors. 

109 if re.match(r'^[A-Za-z]:', fname): 

110 return False 

111 # Windows reserved names. Apply to the stem (everything before the 

112 # first dot) since CON.json / CON.txt both alias to CON. 

113 stem = fname.split('.', 1)[0].upper() 

114 if stem in _WINDOWS_RESERVED_NAMES: 

115 return False 

116 # Belt+suspenders: basename must equal fname. Catches anything 

117 # the explicit checks above missed. 

118 if os.path.basename(fname) != fname: 

119 return False 

120 return True 

121 

122 

123def _load_push_cache() -> dict: 

124 """Read the per-prompt last-pushed-checksum cache. Returns {} 

125 on missing/corrupt - cache is purely an optimization.""" 

126 try: 

127 with open(_PUSH_CACHE_FILE, 'r', encoding='utf-8') as f: 

128 return json.load(f) 

129 except (FileNotFoundError, json.JSONDecodeError, OSError): 

130 return {} 

131 

132 

133def _store_push_cache(cache: dict) -> None: 

134 """Write the cache atomically (temp + os.replace). Best-effort - 

135 a write failure just means the next push won't be skipped.""" 

136 try: 

137 os.makedirs(os.path.dirname(_PUSH_CACHE_FILE), exist_ok=True) 

138 tmp = _PUSH_CACHE_FILE + '.tmp' 

139 with open(tmp, 'w', encoding='utf-8') as f: 

140 json.dump(cache, f) 

141 os.replace(tmp, _PUSH_CACHE_FILE) 

142 except (IOError, OSError) as e: 

143 logger.debug(f'recipe_sync: push-cache write failed: {e}') 

144 

145 

146def _files_for_prompt(prompts_dir: str, prompt_id) -> List[str]: 

147 """List all on-disk filenames for a given prompt_id. 

148 

149 Matches the canonical naming convention: 

150 {prompt_id}.json config (always present) 

151 {prompt_id}_personality.json personality (optional) 

152 {prompt_id}_{flow}_recipe.json flow recipe (per flow) 

153 {prompt_id}_{flow}_{action}.json per-action recipes 

154 

155 The leading prefix ``{prompt_id}_`` AND the bare ``{prompt_id}.json`` 

156 are both matched so callers don't have to enumerate suffix 

157 variations themselves. 

158 """ 

159 pid_str = str(prompt_id) 

160 if not os.path.isdir(prompts_dir): 

161 return [] 

162 out = [] 

163 for fname in os.listdir(prompts_dir): 

164 if not fname.endswith('.json'): 

165 continue 

166 if fname == f'{pid_str}.json' or fname.startswith(f'{pid_str}_'): 

167 out.append(fname) 

168 return sorted(out) 

169 

170 

171def _checksum(files: Dict[str, str]) -> str: 

172 """Stable sha256 over the canonicalized files dict. Lets the 

173 cloud endpoint dedupe identical pushes + lets the client skip 

174 a re-push when nothing changed since the last sync.""" 

175 canonical = json.dumps(files, sort_keys=True, separators=(',', ':')) 

176 return hashlib.sha256(canonical.encode('utf-8')).hexdigest() 

177 

178 

179def build_envelope(prompts_dir: str, prompt_id, user_id: str = '') -> Optional[dict]: 

180 """Read all {prompt_id}*.json files from disk + wrap in the 

181 canonical envelope. Returns None if no files exist (caller 

182 treats as a no-op push, not an error).""" 

183 filenames = _files_for_prompt(prompts_dir, prompt_id) 

184 if not filenames: 

185 logger.debug(f'recipe_sync: no files for prompt_id={prompt_id} on disk') 

186 return None 

187 files: Dict[str, str] = {} 

188 for fname in filenames: 

189 path = os.path.join(prompts_dir, fname) 

190 try: 

191 with open(path, 'r', encoding='utf-8') as f: 

192 files[fname] = f.read() 

193 except (IOError, OSError) as e: 

194 logger.debug(f'recipe_sync: skipped {fname} ({e})') 

195 if not files: 

196 return None 

197 return { 

198 'schema_version': SCHEMA_VERSION, 

199 'prompt_id': str(prompt_id), 

200 'user_id': user_id or '', 

201 'files': files, 

202 'checksum': _checksum(files), 

203 'uploaded_at': int(time.time()), 

204 } 

205 

206 

207def push_recipe(prompts_dir: str, prompt_id, user_id: str = '', 

208 central_url: str = '', force: bool = False) -> bool: 

209 """Push all on-disk files for *prompt_id* to the central cloud. 

210 

211 Returns True on 2xx, False on any failure (offline, schema 

212 mismatch, file missing). Best-effort: never raises. 

213 

214 Surfaces failures at WARNING (not debug) so operators can see 

215 when cross-device sync is silently broken - this whole module 

216 exists because of a silent-fallback bug; debug-level here would 

217 repeat the anti-pattern at the other end of the wire. 

218 

219 Skips the network roundtrip when the bundle's checksum matches 

220 the cache from the last successful push (M7 in the post-shipment 

221 review). Pass ``force=True`` to override - useful when the 

222 cache itself is suspect or the cloud rejected the prior push 

223 for a transient reason. 

224 """ 

225 if not central_url: 

226 try: 

227 from core.config_cache import get_central_db_url 

228 central_url = get_central_db_url() 

229 except ImportError: 

230 central_url = '' 

231 if not central_url: 

232 logger.debug('recipe_sync: no central_url available, skip push') 

233 return False 

234 

235 envelope = build_envelope(prompts_dir, prompt_id, user_id) 

236 if envelope is None: 

237 return False 

238 

239 # M7: skip when content unchanged since last successful push. 

240 cache = _load_push_cache() 

241 cache_key = str(prompt_id) 

242 if not force and cache.get(cache_key, {}).get('checksum') == envelope['checksum']: 

243 logger.debug( 

244 f'recipe_sync: prompt_id={prompt_id} unchanged since last ' 

245 f'push (checksum={envelope["checksum"][:12]}) - skipping') 

246 return True 

247 

248 try: 

249 from core.http_pool import pooled_post 

250 url = f"{central_url.rstrip('/')}/prompts/sync" 

251 resp = pooled_post(url, json=envelope, timeout=(3, 10)) 

252 if 200 <= resp.status_code < 300: 

253 logger.info( 

254 f'recipe_sync: pushed prompt_id={prompt_id} ' 

255 f'({len(envelope["files"])} files, ' 

256 f'checksum={envelope["checksum"][:12]})') 

257 cache[cache_key] = { 

258 'checksum': envelope['checksum'], 

259 'pushed_at': envelope['uploaded_at'], 

260 } 

261 _store_push_cache(cache) 

262 return True 

263 # M1: surface as WARNING - cross-device sync silently broken 

264 # is exactly the anti-pattern this module exists to fix. 

265 logger.warning( 

266 f'recipe_sync: push prompt_id={prompt_id} returned ' 

267 f'status={resp.status_code} - cross-device sync deferred ' 

268 f'until next successful push (next CREATE event or manual)') 

269 return False 

270 except Exception as e: 

271 logger.warning( 

272 f'recipe_sync: push prompt_id={prompt_id} failed: {e} - ' 

273 f'cross-device sync deferred') 

274 return False 

275 

276 

277def pull_recipe(prompts_dir: str, prompt_id, user_id: str = '', 

278 central_url: str = '') -> bool: 

279 """Pull recipe bundle for *prompt_id* from cloud + write to disk. 

280 

281 Returns True if at least one file was written, False otherwise 

282 (not on cloud, offline, write failed, schema mismatch). 

283 

284 Files are written under their original names in *prompts_dir*. 

285 Existing files are NOT overwritten unless the cloud envelope's 

286 checksum differs from the local checksum (last-write-wins by 

287 upload_at would be racier; checksum-equality keeps idempotent). 

288 """ 

289 if not central_url: 

290 try: 

291 from core.config_cache import get_central_db_url 

292 central_url = get_central_db_url() 

293 except ImportError: 

294 central_url = '' 

295 if not central_url: 

296 logger.debug('recipe_sync: no central_url available, skip pull') 

297 return False 

298 

299 try: 

300 from core.http_pool import get_http_session 

301 params = {'user_id': user_id} if user_id else {} 

302 url = f"{central_url.rstrip('/')}/prompts/sync/{prompt_id}" 

303 resp = get_http_session().get(url, params=params, timeout=(3, 10)) 

304 if resp.status_code == 404: 

305 logger.debug(f'recipe_sync: prompt_id={prompt_id} not on cloud') 

306 return False 

307 if not (200 <= resp.status_code < 300): 

308 logger.debug( 

309 f'recipe_sync: pull prompt_id={prompt_id} status=' 

310 f'{resp.status_code}') 

311 return False 

312 envelope = resp.json() 

313 except Exception as e: 

314 logger.debug(f'recipe_sync: pull prompt_id={prompt_id} failed: {e}') 

315 return False 

316 

317 if envelope.get('schema_version') != SCHEMA_VERSION: 

318 logger.warning( 

319 f'recipe_sync: schema mismatch for prompt_id={prompt_id} ' 

320 f'(cloud={envelope.get("schema_version")}, ' 

321 f'local={SCHEMA_VERSION}) - skipping pull') 

322 return False 

323 

324 files = envelope.get('files') or {} 

325 if not files: 

326 return False 

327 

328 # Skip pull when local + cloud checksums match - common case 

329 # after a push; saves a writable-disk roundtrip. 

330 local_envelope = build_envelope(prompts_dir, prompt_id, user_id) 

331 if local_envelope and local_envelope['checksum'] == envelope.get('checksum'): 

332 logger.debug( 

333 f'recipe_sync: prompt_id={prompt_id} checksum matches local, ' 

334 f'skipping write') 

335 return True 

336 

337 try: 

338 os.makedirs(prompts_dir, exist_ok=True) 

339 except OSError as e: 

340 logger.debug(f'recipe_sync: cannot create {prompts_dir}: {e}') 

341 return False 

342 

343 written = 0 

344 for fname, content in files.items(): 

345 # Defensive: refuse paths that escape the prompts dir, drive 

346 # letters, NUL bytes, Windows reserved names. See _safe_filename 

347 # docstring + reviewer M2 for the full hardening rationale. 

348 if not _safe_filename(fname): 

349 logger.warning( 

350 f'recipe_sync: refusing unsafe filename {fname!r} ' 

351 f'in pull payload for prompt_id={prompt_id}') 

352 continue 

353 path = os.path.join(prompts_dir, fname) 

354 # Atomic write (M3): write to a temp file in the same dir, 

355 # then os.replace to the final name. Prevents partial-content 

356 # snapshots from concurrent prompts_backup.snapshot_prompts 

357 # picking up a half-written {pid}.json. 

358 tmp = path + '.tmp' 

359 try: 

360 with open(tmp, 'w', encoding='utf-8') as f: 

361 f.write(content) 

362 os.replace(tmp, path) 

363 written += 1 

364 except (IOError, OSError) as e: 

365 logger.debug(f'recipe_sync: write {fname} failed: {e}') 

366 # Clean up orphan temp on failure. 

367 try: 

368 os.remove(tmp) 

369 except OSError: 

370 pass 

371 if written: 

372 logger.info( 

373 f'recipe_sync: pulled prompt_id={prompt_id} ' 

374 f'({written}/{len(files)} files written)') 

375 return True 

376 return False