Coverage for core / cache_loaders.py: 73.0%

100 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Cache loaders for restorable TTLCache. 

3 

4Each loader takes a cache key and returns the value from persistent storage, 

5or None if not found. These are used as TTLCache loader callbacks to 

6auto-restore evicted or expired entries from disk/Redis. 

7""" 

8 

9import os 

10import sys 

11import json 

12import logging 

13 

14logger = logging.getLogger('hevolve_core') 

15 

16def _resolve_agent_data_dir(): 

17 db_path = os.environ.get('HEVOLVE_DB_PATH', '') 

18 if db_path and db_path != ':memory:' and os.path.isabs(db_path): 

19 return os.path.join(os.path.dirname(db_path), 'agent_data') 

20 # Bundled/frozen mode: use cross-platform data dir (Program Files is read-only) 

21 if os.environ.get('NUNBA_BUNDLED') or getattr(sys, 'frozen', False): 

22 try: 

23 from core.platform_paths import get_agent_data_dir 

24 return get_agent_data_dir() 

25 except ImportError: 

26 return os.path.join( 

27 os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data') 

28 return os.path.join(os.path.dirname(os.path.dirname(__file__)), 'agent_data') 

29 

30AGENT_DATA_DIR = _resolve_agent_data_dir() 

31 

32def _resolve_prompts_dir(): 

33 base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'prompts') 

34 if os.path.isdir(base): 

35 return base 

36 # Bundled mode fallback: cross-platform prompts dir 

37 if os.environ.get('NUNBA_BUNDLED') or getattr(sys, 'frozen', False): 

38 try: 

39 from core.platform_paths import get_prompts_dir 

40 return get_prompts_dir() 

41 except ImportError: 

42 return os.path.join( 

43 os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'prompts') 

44 return base 

45 

46PROMPTS_DIR = _resolve_prompts_dir() 

47 

48 

49def load_agent_data(prompt_id): 

50 """Load agent_data from disk. Key is prompt_id (int or str).""" 

51 # Sanitize to prevent path traversal 

52 safe_id = str(prompt_id) 

53 if not safe_id.replace('_', '').replace('-', '').isalnum(): 

54 return None 

55 file_path = os.path.join(AGENT_DATA_DIR, f"{safe_id}_agent_data.json") 

56 if not os.path.exists(file_path): 

57 return None 

58 

59 try: 

60 # Try encrypted load first 

61 try: 

62 from security.crypto import decrypt_json_file 

63 loaded_data = decrypt_json_file(file_path) 

64 if loaded_data is None: 

65 return None 

66 except (ImportError, Exception): 

67 with open(file_path, 'r', encoding='utf-8') as f: 

68 loaded_data = json.load(f) 

69 

70 # Extract actual data (skip metadata wrapper) 

71 if isinstance(loaded_data, dict) and 'data' in loaded_data: 

72 logger.info(f"Restored agent_data for prompt_id={prompt_id} from disk") 

73 return loaded_data['data'] 

74 else: 

75 logger.info(f"Restored agent_data (old format) for prompt_id={prompt_id} from disk") 

76 return loaded_data 

77 except Exception as e: 

78 logger.debug(f"Failed to load agent_data for {prompt_id}: {e}") 

79 return None 

80 

81 

82def load_user_ledger(user_prompt): 

83 """Load SmartLedger from Redis/JSON. Key is user_prompt (e.g. '123_456').""" 

84 parts = str(user_prompt).split('_', 1) 

85 if len(parts) != 2: 

86 return None 

87 

88 user_id, prompt_id = parts 

89 try: 

90 user_id_int = int(user_id) 

91 prompt_id_int = int(prompt_id) 

92 except (ValueError, TypeError): 

93 return None 

94 

95 try: 

96 from helper_ledger import create_ledger_with_auto_backend 

97 ledger = create_ledger_with_auto_backend(user_id_int, prompt_id_int) 

98 if ledger and hasattr(ledger, 'tasks') and len(ledger.tasks) > 0: 

99 # Also restore action_states from this ledger 

100 try: 

101 from lifecycle_hooks import restore_action_states_from_ledger 

102 restored = restore_action_states_from_ledger(user_prompt, ledger) 

103 if restored > 0: 

104 logger.info(f"Restored {restored} action states for {user_prompt}") 

105 except Exception as e: 

106 logger.debug(f"Could not restore action_states: {e}") 

107 logger.info(f"Restored ledger for {user_prompt} with {len(ledger.tasks)} tasks") 

108 return ledger 

109 except Exception as e: 

110 logger.debug(f"Failed to load ledger for {user_prompt}: {e}") 

111 

112 return None 

113 

114 

115def load_recipe(user_prompt): 

116 """Load recipe from disk. Key is user_prompt (e.g. '123_456').""" 

117 parts = str(user_prompt).split('_', 1) 

118 if len(parts) != 2: 

119 return None 

120 

121 _user_id, prompt_id = parts 

122 

123 # Try flow 0 first (most common), then scan for any flow 

124 for flow_id in range(10): 

125 file_path = os.path.join(PROMPTS_DIR, f"{prompt_id}_{flow_id}_recipe.json") 

126 if os.path.exists(file_path): 

127 try: 

128 with open(file_path, 'r', encoding='utf-8') as f: 

129 content = f.read() 

130 from helper import retrieve_json 

131 recipe = retrieve_json(content) 

132 if recipe: 

133 logger.info(f"Restored recipe for {user_prompt} from {file_path}") 

134 return recipe 

135 except Exception as e: 

136 logger.debug(f"Failed to load recipe from {file_path}: {e}") 

137 

138 return None 

139 

140 

141def load_user_simplemem(user_prompt): 

142 """Load SimpleMem store. Key is user_prompt (e.g. '123_456').""" 

143 try: 

144 from core.platform_paths import get_simplemem_dir 

145 simplemem_dir = get_simplemem_dir(str(user_prompt)) 

146 except ImportError: 

147 simplemem_dir = os.path.join('.', 'simplemem_db', str(user_prompt)) 

148 if not os.path.exists(simplemem_dir): 

149 return None 

150 

151 try: 

152 from integrations.channels.memory.simplemem_store import SimpleMemStore, SimpleMemConfig 

153 config = SimpleMemConfig() 

154 store = SimpleMemStore(config) 

155 logger.info(f"Restored SimpleMem store for {user_prompt}") 

156 return store 

157 except (ImportError, Exception) as e: 

158 logger.debug(f"Failed to load SimpleMem for {user_prompt}: {e}") 

159 return None