Coverage for core / cache_loaders.py: 73.0%
100 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Cache loaders for restorable TTLCache.
4Each loader takes a cache key and returns the value from persistent storage,
5or None if not found. These are used as TTLCache loader callbacks to
6auto-restore evicted or expired entries from disk/Redis.
7"""
9import os
10import sys
11import json
12import logging
14logger = logging.getLogger('hevolve_core')
16def _resolve_agent_data_dir():
17 db_path = os.environ.get('HEVOLVE_DB_PATH', '')
18 if db_path and db_path != ':memory:' and os.path.isabs(db_path):
19 return os.path.join(os.path.dirname(db_path), 'agent_data')
20 # Bundled/frozen mode: use cross-platform data dir (Program Files is read-only)
21 if os.environ.get('NUNBA_BUNDLED') or getattr(sys, 'frozen', False):
22 try:
23 from core.platform_paths import get_agent_data_dir
24 return get_agent_data_dir()
25 except ImportError:
26 return os.path.join(
27 os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data')
28 return os.path.join(os.path.dirname(os.path.dirname(__file__)), 'agent_data')
30AGENT_DATA_DIR = _resolve_agent_data_dir()
32def _resolve_prompts_dir():
33 base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'prompts')
34 if os.path.isdir(base):
35 return base
36 # Bundled mode fallback: cross-platform prompts dir
37 if os.environ.get('NUNBA_BUNDLED') or getattr(sys, 'frozen', False):
38 try:
39 from core.platform_paths import get_prompts_dir
40 return get_prompts_dir()
41 except ImportError:
42 return os.path.join(
43 os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'prompts')
44 return base
46PROMPTS_DIR = _resolve_prompts_dir()
49def load_agent_data(prompt_id):
50 """Load agent_data from disk. Key is prompt_id (int or str)."""
51 # Sanitize to prevent path traversal
52 safe_id = str(prompt_id)
53 if not safe_id.replace('_', '').replace('-', '').isalnum():
54 return None
55 file_path = os.path.join(AGENT_DATA_DIR, f"{safe_id}_agent_data.json")
56 if not os.path.exists(file_path):
57 return None
59 try:
60 # Try encrypted load first
61 try:
62 from security.crypto import decrypt_json_file
63 loaded_data = decrypt_json_file(file_path)
64 if loaded_data is None:
65 return None
66 except (ImportError, Exception):
67 with open(file_path, 'r', encoding='utf-8') as f:
68 loaded_data = json.load(f)
70 # Extract actual data (skip metadata wrapper)
71 if isinstance(loaded_data, dict) and 'data' in loaded_data:
72 logger.info(f"Restored agent_data for prompt_id={prompt_id} from disk")
73 return loaded_data['data']
74 else:
75 logger.info(f"Restored agent_data (old format) for prompt_id={prompt_id} from disk")
76 return loaded_data
77 except Exception as e:
78 logger.debug(f"Failed to load agent_data for {prompt_id}: {e}")
79 return None
82def load_user_ledger(user_prompt):
83 """Load SmartLedger from Redis/JSON. Key is user_prompt (e.g. '123_456')."""
84 parts = str(user_prompt).split('_', 1)
85 if len(parts) != 2:
86 return None
88 user_id, prompt_id = parts
89 try:
90 user_id_int = int(user_id)
91 prompt_id_int = int(prompt_id)
92 except (ValueError, TypeError):
93 return None
95 try:
96 from helper_ledger import create_ledger_with_auto_backend
97 ledger = create_ledger_with_auto_backend(user_id_int, prompt_id_int)
98 if ledger and hasattr(ledger, 'tasks') and len(ledger.tasks) > 0:
99 # Also restore action_states from this ledger
100 try:
101 from lifecycle_hooks import restore_action_states_from_ledger
102 restored = restore_action_states_from_ledger(user_prompt, ledger)
103 if restored > 0:
104 logger.info(f"Restored {restored} action states for {user_prompt}")
105 except Exception as e:
106 logger.debug(f"Could not restore action_states: {e}")
107 logger.info(f"Restored ledger for {user_prompt} with {len(ledger.tasks)} tasks")
108 return ledger
109 except Exception as e:
110 logger.debug(f"Failed to load ledger for {user_prompt}: {e}")
112 return None
115def load_recipe(user_prompt):
116 """Load recipe from disk. Key is user_prompt (e.g. '123_456')."""
117 parts = str(user_prompt).split('_', 1)
118 if len(parts) != 2:
119 return None
121 _user_id, prompt_id = parts
123 # Try flow 0 first (most common), then scan for any flow
124 for flow_id in range(10):
125 file_path = os.path.join(PROMPTS_DIR, f"{prompt_id}_{flow_id}_recipe.json")
126 if os.path.exists(file_path):
127 try:
128 with open(file_path, 'r', encoding='utf-8') as f:
129 content = f.read()
130 from helper import retrieve_json
131 recipe = retrieve_json(content)
132 if recipe:
133 logger.info(f"Restored recipe for {user_prompt} from {file_path}")
134 return recipe
135 except Exception as e:
136 logger.debug(f"Failed to load recipe from {file_path}: {e}")
138 return None
141def load_user_simplemem(user_prompt):
142 """Load SimpleMem store. Key is user_prompt (e.g. '123_456')."""
143 try:
144 from core.platform_paths import get_simplemem_dir
145 simplemem_dir = get_simplemem_dir(str(user_prompt))
146 except ImportError:
147 simplemem_dir = os.path.join('.', 'simplemem_db', str(user_prompt))
148 if not os.path.exists(simplemem_dir):
149 return None
151 try:
152 from integrations.channels.memory.simplemem_store import SimpleMemStore, SimpleMemConfig
153 config = SimpleMemConfig()
154 store = SimpleMemStore(config)
155 logger.info(f"Restored SimpleMem store for {user_prompt}")
156 return store
157 except (ImportError, Exception) as e:
158 logger.debug(f"Failed to load SimpleMem for {user_prompt}: {e}")
159 return None