Coverage for core / recipe_sync.py: 87.2%
156 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2core.recipe_sync - cloud push/pull of recipe-file bundles.
4Closes the cross-device gap user hit on 2026-05-04 (Speech Therapy
5clicked from Recents -> recipe file missing locally -> silent
6fallback to local_assistant). Existing /createpromptlist syncs
7only metadata (name, prompt, image_url); recipe BLOBS ({id}.json
8flows + actions, {id}_*_recipe.json distilled steps,
9{id}_personality.json) were local-only - so an agent created on
10device A never reached device B.
12This module pushes the full recipe bundle (ALL {id}*.json files in
13PROMPTS_DIR) to a central server when an agent is created or its
14recipe is updated, and pulls them back when the user opens an
15agent whose recipe is missing locally.
17Wire format (single source of truth - both push and pull use the
18same envelope):
20 POST {CENTRAL_DB_URL}/prompts/sync
21 {
22 "schema_version": 1,
23 "prompt_id": "<int or str>",
24 "user_id": "<owner>",
25 "files": {"<filename>": "<file contents as string>", ...},
26 "checksum": "<sha256 of canonical files-dict>",
27 "uploaded_at": <unix epoch>
28 }
29 -> 200 OK { "stored": true, "checksum": "..." }
31 GET {CENTRAL_DB_URL}/prompts/sync/{prompt_id}?user_id=...
32 -> 200 OK { same envelope as above }
33 -> 404 { "error": "not_found" }
35The cloud endpoint MAY be served by HARTOS itself (for centrally-
36deployed instances) or by an external sync service. The HARTOS
37side ships endpoints in hart_intelligence_entry.py:/prompts/sync.
39Best-effort: every push/pull is wrapped in try/except so a sync
40failure (offline, central down, schema mismatch) never blocks
41the user-facing flow. All failures log at debug; only the rare
42schema-version mismatch warns.
43"""
45import hashlib
46import json
47import logging
48import os
49import re
50import time
51from typing import Dict, List, Optional
53logger = logging.getLogger('hevolve.recipe_sync')
55#: Bump when the wire envelope changes shape. Cloud endpoints
56#: REJECT requests with schema_version > known_max, so older clients
57#: see a graceful 4xx instead of silently corrupting cloud state.
58SCHEMA_VERSION: int = 1
60#: Per-prompt last-pushed checksum cache file - skips redundant cloud
61#: writes during multi-flow CREATE (each flow triggers a push; if the
62#: bundle didn't change we can skip the network roundtrip). See M7
63#: in the post-shipment idempotency review.
64_PUSH_CACHE_FILE: str = os.path.join(
65 os.path.expanduser('~'), '.hevolve', 'recipe_sync_state.json')
67#: Reserved Windows device names that os.open() silently aliases to
68#: actual hardware - any incoming filename matching these (with or
69#: without extension) must be rejected. Mirror Windows' own
70#: reserved-name list per MS-DOS Device Naming Convention docs.
71_WINDOWS_RESERVED_NAMES = frozenset({
72 'CON', 'PRN', 'AUX', 'NUL',
73 'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
74 'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9',
75})
78def _safe_filename(fname: str) -> bool:
79 """Validate a filename from an untrusted cloud payload.
81 Rejects:
82 - empty / dotfile / current-or-parent dir
83 - any path separator (forward or back) - keeps writes inside the
84 target dir even when os.path.join would otherwise strip it
85 - drive-letter prefix (Windows: ``C:foo.json``) - os.path.join
86 SILENTLY ignores the dir prefix when joined with a drive-relative
87 path, which lets attackers anchor writes anywhere on the drive
88 - NUL byte - Python open() raises ValueError but we want a clean
89 WARNING instead of a stack trace
90 - Windows reserved device names (CON, NUL, COM1, ...) - opens
91 succeed and alias to actual hardware
92 - basename mismatch - os.path.basename(fname) != fname catches
93 any composite path that snuck past the simple separator check
95 Returns True when safe to write under prompts_dir, False otherwise.
96 Caller logs WARNING on False.
97 """
98 if not fname or fname in {'.', '..'}:
99 return False
100 if fname.startswith('.'):
101 return False
102 if '/' in fname or '\\' in fname:
103 return False
104 if '\x00' in fname:
105 return False
106 # Drive letter detection: a single letter followed by ':' anywhere
107 # in the name is suspicious on Windows. os.path.basename only
108 # strips trailing path components, NOT drive-relative anchors.
109 if re.match(r'^[A-Za-z]:', fname):
110 return False
111 # Windows reserved names. Apply to the stem (everything before the
112 # first dot) since CON.json / CON.txt both alias to CON.
113 stem = fname.split('.', 1)[0].upper()
114 if stem in _WINDOWS_RESERVED_NAMES:
115 return False
116 # Belt+suspenders: basename must equal fname. Catches anything
117 # the explicit checks above missed.
118 if os.path.basename(fname) != fname:
119 return False
120 return True
123def _load_push_cache() -> dict:
124 """Read the per-prompt last-pushed-checksum cache. Returns {}
125 on missing/corrupt - cache is purely an optimization."""
126 try:
127 with open(_PUSH_CACHE_FILE, 'r', encoding='utf-8') as f:
128 return json.load(f)
129 except (FileNotFoundError, json.JSONDecodeError, OSError):
130 return {}
133def _store_push_cache(cache: dict) -> None:
134 """Write the cache atomically (temp + os.replace). Best-effort -
135 a write failure just means the next push won't be skipped."""
136 try:
137 os.makedirs(os.path.dirname(_PUSH_CACHE_FILE), exist_ok=True)
138 tmp = _PUSH_CACHE_FILE + '.tmp'
139 with open(tmp, 'w', encoding='utf-8') as f:
140 json.dump(cache, f)
141 os.replace(tmp, _PUSH_CACHE_FILE)
142 except (IOError, OSError) as e:
143 logger.debug(f'recipe_sync: push-cache write failed: {e}')
146def _files_for_prompt(prompts_dir: str, prompt_id) -> List[str]:
147 """List all on-disk filenames for a given prompt_id.
149 Matches the canonical naming convention:
150 {prompt_id}.json config (always present)
151 {prompt_id}_personality.json personality (optional)
152 {prompt_id}_{flow}_recipe.json flow recipe (per flow)
153 {prompt_id}_{flow}_{action}.json per-action recipes
155 The leading prefix ``{prompt_id}_`` AND the bare ``{prompt_id}.json``
156 are both matched so callers don't have to enumerate suffix
157 variations themselves.
158 """
159 pid_str = str(prompt_id)
160 if not os.path.isdir(prompts_dir):
161 return []
162 out = []
163 for fname in os.listdir(prompts_dir):
164 if not fname.endswith('.json'):
165 continue
166 if fname == f'{pid_str}.json' or fname.startswith(f'{pid_str}_'):
167 out.append(fname)
168 return sorted(out)
171def _checksum(files: Dict[str, str]) -> str:
172 """Stable sha256 over the canonicalized files dict. Lets the
173 cloud endpoint dedupe identical pushes + lets the client skip
174 a re-push when nothing changed since the last sync."""
175 canonical = json.dumps(files, sort_keys=True, separators=(',', ':'))
176 return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
179def build_envelope(prompts_dir: str, prompt_id, user_id: str = '') -> Optional[dict]:
180 """Read all {prompt_id}*.json files from disk + wrap in the
181 canonical envelope. Returns None if no files exist (caller
182 treats as a no-op push, not an error)."""
183 filenames = _files_for_prompt(prompts_dir, prompt_id)
184 if not filenames:
185 logger.debug(f'recipe_sync: no files for prompt_id={prompt_id} on disk')
186 return None
187 files: Dict[str, str] = {}
188 for fname in filenames:
189 path = os.path.join(prompts_dir, fname)
190 try:
191 with open(path, 'r', encoding='utf-8') as f:
192 files[fname] = f.read()
193 except (IOError, OSError) as e:
194 logger.debug(f'recipe_sync: skipped {fname} ({e})')
195 if not files:
196 return None
197 return {
198 'schema_version': SCHEMA_VERSION,
199 'prompt_id': str(prompt_id),
200 'user_id': user_id or '',
201 'files': files,
202 'checksum': _checksum(files),
203 'uploaded_at': int(time.time()),
204 }
207def push_recipe(prompts_dir: str, prompt_id, user_id: str = '',
208 central_url: str = '', force: bool = False) -> bool:
209 """Push all on-disk files for *prompt_id* to the central cloud.
211 Returns True on 2xx, False on any failure (offline, schema
212 mismatch, file missing). Best-effort: never raises.
214 Surfaces failures at WARNING (not debug) so operators can see
215 when cross-device sync is silently broken - this whole module
216 exists because of a silent-fallback bug; debug-level here would
217 repeat the anti-pattern at the other end of the wire.
219 Skips the network roundtrip when the bundle's checksum matches
220 the cache from the last successful push (M7 in the post-shipment
221 review). Pass ``force=True`` to override - useful when the
222 cache itself is suspect or the cloud rejected the prior push
223 for a transient reason.
224 """
225 if not central_url:
226 try:
227 from core.config_cache import get_central_db_url
228 central_url = get_central_db_url()
229 except ImportError:
230 central_url = ''
231 if not central_url:
232 logger.debug('recipe_sync: no central_url available, skip push')
233 return False
235 envelope = build_envelope(prompts_dir, prompt_id, user_id)
236 if envelope is None:
237 return False
239 # M7: skip when content unchanged since last successful push.
240 cache = _load_push_cache()
241 cache_key = str(prompt_id)
242 if not force and cache.get(cache_key, {}).get('checksum') == envelope['checksum']:
243 logger.debug(
244 f'recipe_sync: prompt_id={prompt_id} unchanged since last '
245 f'push (checksum={envelope["checksum"][:12]}) - skipping')
246 return True
248 try:
249 from core.http_pool import pooled_post
250 url = f"{central_url.rstrip('/')}/prompts/sync"
251 resp = pooled_post(url, json=envelope, timeout=(3, 10))
252 if 200 <= resp.status_code < 300:
253 logger.info(
254 f'recipe_sync: pushed prompt_id={prompt_id} '
255 f'({len(envelope["files"])} files, '
256 f'checksum={envelope["checksum"][:12]})')
257 cache[cache_key] = {
258 'checksum': envelope['checksum'],
259 'pushed_at': envelope['uploaded_at'],
260 }
261 _store_push_cache(cache)
262 return True
263 # M1: surface as WARNING - cross-device sync silently broken
264 # is exactly the anti-pattern this module exists to fix.
265 logger.warning(
266 f'recipe_sync: push prompt_id={prompt_id} returned '
267 f'status={resp.status_code} - cross-device sync deferred '
268 f'until next successful push (next CREATE event or manual)')
269 return False
270 except Exception as e:
271 logger.warning(
272 f'recipe_sync: push prompt_id={prompt_id} failed: {e} - '
273 f'cross-device sync deferred')
274 return False
277def pull_recipe(prompts_dir: str, prompt_id, user_id: str = '',
278 central_url: str = '') -> bool:
279 """Pull recipe bundle for *prompt_id* from cloud + write to disk.
281 Returns True if at least one file was written, False otherwise
282 (not on cloud, offline, write failed, schema mismatch).
284 Files are written under their original names in *prompts_dir*.
285 Existing files are NOT overwritten unless the cloud envelope's
286 checksum differs from the local checksum (last-write-wins by
287 upload_at would be racier; checksum-equality keeps idempotent).
288 """
289 if not central_url:
290 try:
291 from core.config_cache import get_central_db_url
292 central_url = get_central_db_url()
293 except ImportError:
294 central_url = ''
295 if not central_url:
296 logger.debug('recipe_sync: no central_url available, skip pull')
297 return False
299 try:
300 from core.http_pool import get_http_session
301 params = {'user_id': user_id} if user_id else {}
302 url = f"{central_url.rstrip('/')}/prompts/sync/{prompt_id}"
303 resp = get_http_session().get(url, params=params, timeout=(3, 10))
304 if resp.status_code == 404:
305 logger.debug(f'recipe_sync: prompt_id={prompt_id} not on cloud')
306 return False
307 if not (200 <= resp.status_code < 300):
308 logger.debug(
309 f'recipe_sync: pull prompt_id={prompt_id} status='
310 f'{resp.status_code}')
311 return False
312 envelope = resp.json()
313 except Exception as e:
314 logger.debug(f'recipe_sync: pull prompt_id={prompt_id} failed: {e}')
315 return False
317 if envelope.get('schema_version') != SCHEMA_VERSION:
318 logger.warning(
319 f'recipe_sync: schema mismatch for prompt_id={prompt_id} '
320 f'(cloud={envelope.get("schema_version")}, '
321 f'local={SCHEMA_VERSION}) - skipping pull')
322 return False
324 files = envelope.get('files') or {}
325 if not files:
326 return False
328 # Skip pull when local + cloud checksums match - common case
329 # after a push; saves a writable-disk roundtrip.
330 local_envelope = build_envelope(prompts_dir, prompt_id, user_id)
331 if local_envelope and local_envelope['checksum'] == envelope.get('checksum'):
332 logger.debug(
333 f'recipe_sync: prompt_id={prompt_id} checksum matches local, '
334 f'skipping write')
335 return True
337 try:
338 os.makedirs(prompts_dir, exist_ok=True)
339 except OSError as e:
340 logger.debug(f'recipe_sync: cannot create {prompts_dir}: {e}')
341 return False
343 written = 0
344 for fname, content in files.items():
345 # Defensive: refuse paths that escape the prompts dir, drive
346 # letters, NUL bytes, Windows reserved names. See _safe_filename
347 # docstring + reviewer M2 for the full hardening rationale.
348 if not _safe_filename(fname):
349 logger.warning(
350 f'recipe_sync: refusing unsafe filename {fname!r} '
351 f'in pull payload for prompt_id={prompt_id}')
352 continue
353 path = os.path.join(prompts_dir, fname)
354 # Atomic write (M3): write to a temp file in the same dir,
355 # then os.replace to the final name. Prevents partial-content
356 # snapshots from concurrent prompts_backup.snapshot_prompts
357 # picking up a half-written {pid}.json.
358 tmp = path + '.tmp'
359 try:
360 with open(tmp, 'w', encoding='utf-8') as f:
361 f.write(content)
362 os.replace(tmp, path)
363 written += 1
364 except (IOError, OSError) as e:
365 logger.debug(f'recipe_sync: write {fname} failed: {e}')
366 # Clean up orphan temp on failure.
367 try:
368 os.remove(tmp)
369 except OSError:
370 pass
371 if written:
372 logger.info(
373 f'recipe_sync: pulled prompt_id={prompt_id} '
374 f'({written}/{len(files)} files written)')
375 return True
376 return False