Coverage for integrations / channels / memory / simplemem_langchain.py: 35.7%

224 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2SimpleMem-backed LangChain memory — drop-in replacement for ZepMemory. 

3 

4Zero-latency reads: load_memory_variables() returns in-memory buffer (no network). 

5Persistent writes: save_context() persists to JSON + feeds SimpleMem for semantic search. 

6Deterministic search: search_by_metadata() filters by request_Id, prompt_id, date range. 

7Semantic search: semantic_search(query) uses SimpleMem's adaptive retrieval. 

8 

9Replaces ZepMemory which required an external Zep server (single point of failure). 

10 

11Performance: 

12- Writes are deferred: in-memory append is instant, disk flush runs on a background 

13 thread with coalescing (multiple rapid writes → single I/O). 

14- Metadata indexes: O(1) lookup by request_Id / prompt_id via inverted index. 

15- Date range: bisect on sorted timestamp array → O(log n) bounds + O(k) scan. 

16- Read/write separation: RLock allows concurrent readers, writers don't block reads 

17 longer than an append. 

18- SimpleMem feed: single reusable background thread + event loop, no per-call overhead. 

19""" 

20 

21import asyncio 

22import bisect 

23import json 

24import logging 

25import os 

26import threading 

27from collections import defaultdict 

28from datetime import datetime 

29from typing import Any, Dict, List, Optional, Tuple 

30 

31from pydantic import Field 

32 

33from langchain_classic.memory.chat_memory import BaseChatMemory 

34from langchain_classic.schema import BaseChatMessageHistory 

35from langchain_classic.schema.messages import BaseMessage, HumanMessage, AIMessage 

36 

37logger = logging.getLogger('hevolve_core') 

38 

39# SimpleMem is optional — buffer-only mode if unavailable 

40try: 

41 from integrations.channels.memory.simplemem_store import ( 

42 SimpleMemStore, SimpleMemConfig, HAS_SIMPLEMEM 

43 ) 

44except ImportError: 

45 HAS_SIMPLEMEM = False 

46 

47try: 

48 from core.platform_paths import get_simplemem_dir 

49 SIMPLEMEM_DB_ROOT = get_simplemem_dir() 

50except ImportError: 

51 SIMPLEMEM_DB_ROOT = os.path.join(os.path.dirname(os.path.dirname( 

52 os.path.dirname(os.path.dirname(os.path.abspath(__file__))))), 'simplemem_db') 

53 

54# Shared background event loop for async SimpleMem calls (one per process) 

55_bg_loop: Optional[asyncio.AbstractEventLoop] = None 

56_bg_thread: Optional[threading.Thread] = None 

57_bg_lock = threading.Lock() 

58 

59 

60def _get_bg_loop() -> asyncio.AbstractEventLoop: 

61 """Return a long-lived background event loop (started once, reused forever).""" 

62 global _bg_loop, _bg_thread 

63 if _bg_loop is not None and _bg_loop.is_running(): 

64 return _bg_loop 

65 with _bg_lock: 

66 if _bg_loop is not None and _bg_loop.is_running(): 

67 return _bg_loop 

68 _bg_loop = asyncio.new_event_loop() 

69 _bg_thread = threading.Thread( 

70 target=_bg_loop.run_forever, daemon=True, name='simplemem-io') 

71 _bg_thread.start() 

72 return _bg_loop 

73 

74 

75# ── Flush coalescing ── 

76# Multiple rapid add_message calls schedule a single disk write after a short 

77# delay. If another write arrives before the timer fires, the timer resets. 

78_FLUSH_DELAY = 0.15 # seconds — coalesce writes within 150ms window 

79 

80 

81class PersistentChatHistory(BaseChatMessageHistory): 

82 """Chat history with persistent JSON buffer, metadata indexes, and SimpleMem feed.""" 

83 

84 __slots__ = ( 

85 '_buffer_file', '_max_messages', '_simplemem_store', 

86 '_messages', '_metadata', '_timestamps', '_idx_request', '_idx_prompt', 

87 '_lock', '_flush_timer', '_dir_ensured', 

88 ) 

89 

90 def __init__(self, buffer_file: str, max_messages: int = 24, 

91 simplemem_store: Any = None): 

92 self._buffer_file = buffer_file 

93 self._max_messages = max_messages 

94 self._simplemem_store = simplemem_store 

95 self._messages: List[BaseMessage] = [] 

96 self._metadata: List[Dict[str, Any]] = [] 

97 self._timestamps: List[str] = [] # sorted ISO strings for bisect 

98 self._idx_request: Dict[str, List[int]] = defaultdict(list) # request_Id → [positions] 

99 self._idx_prompt: Dict[Any, List[int]] = defaultdict(list) # prompt_id → [positions] 

100 self._lock = threading.RLock() 

101 self._flush_timer: Optional[threading.Timer] = None 

102 self._dir_ensured = False 

103 self._load_buffer() 

104 

105 # ── Properties ── 

106 

107 @property 

108 def messages(self) -> List[BaseMessage]: 

109 with self._lock: 

110 return list(self._messages) 

111 

112 # ── Write path ── 

113 

114 def add_message(self, message: BaseMessage, **kwargs) -> None: 

115 metadata = kwargs.get('metadata') or {} 

116 if 'timestamp' not in metadata: 

117 metadata['timestamp'] = datetime.now().isoformat() 

118 ts = metadata['timestamp'] 

119 

120 with self._lock: 

121 pos = len(self._messages) 

122 self._messages.append(message) 

123 self._metadata.append(metadata) 

124 self._timestamps.append(ts) 

125 

126 # Update inverted indexes 

127 req_id = metadata.get('request_Id') 

128 if req_id is not None: 

129 self._idx_request[str(req_id)].append(pos) 

130 prom_id = metadata.get('prompt_id') 

131 if prom_id is not None: 

132 self._idx_prompt[prom_id].append(pos) 

133 

134 # Trim if over capacity 

135 if len(self._messages) > self._max_messages: 

136 self._trim_locked() 

137 

138 self._schedule_flush() 

139 

140 # Feed SimpleMem on background loop (fire-and-forget) 

141 if self._simplemem_store is not None: 

142 speaker = "User" if isinstance(message, HumanMessage) else "Hevolve" 

143 store_meta = dict(metadata) 

144 store_meta['sender_name'] = speaker 

145 try: 

146 loop = _get_bg_loop() 

147 asyncio.run_coroutine_threadsafe( 

148 self._simplemem_store.add(message.content, metadata=store_meta), 

149 loop, 

150 ) 

151 except Exception as e: 

152 logger.debug(f"SimpleMem ingest failed (non-blocking): {e}") 

153 

154 def add_user_message(self, message: str) -> None: 

155 self.add_message(HumanMessage(content=message)) 

156 

157 def add_ai_message(self, message: str) -> None: 

158 self.add_message(AIMessage(content=message)) 

159 

160 def clear(self) -> None: 

161 with self._lock: 

162 self._messages.clear() 

163 self._metadata.clear() 

164 self._timestamps.clear() 

165 self._idx_request.clear() 

166 self._idx_prompt.clear() 

167 self._schedule_flush() 

168 

169 # ── Trim + reindex ── 

170 

171 def _trim_locked(self): 

172 """Trim to max_messages and rebuild indexes. Called under lock.""" 

173 trim = len(self._messages) - self._max_messages 

174 self._messages = self._messages[trim:] 

175 self._metadata = self._metadata[trim:] 

176 self._timestamps = self._timestamps[trim:] 

177 self._rebuild_indexes_locked() 

178 

179 def _rebuild_indexes_locked(self): 

180 """Rebuild inverted indexes from scratch. Called under lock after trim.""" 

181 self._idx_request.clear() 

182 self._idx_prompt.clear() 

183 for i, meta in enumerate(self._metadata): 

184 req_id = meta.get('request_Id') 

185 if req_id is not None: 

186 self._idx_request[str(req_id)].append(i) 

187 prom_id = meta.get('prompt_id') 

188 if prom_id is not None: 

189 self._idx_prompt[prom_id].append(i) 

190 

191 # ── Deferred disk flush ── 

192 

193 def _schedule_flush(self): 

194 """Schedule a coalesced disk write. Resets timer on rapid calls.""" 

195 if self._flush_timer is not None: 

196 self._flush_timer.cancel() 

197 self._flush_timer = threading.Timer(_FLUSH_DELAY, self._flush_to_disk) 

198 self._flush_timer.daemon = True 

199 self._flush_timer.start() 

200 

201 def _flush_to_disk(self): 

202 """Write buffer to JSON — runs on timer thread, grabs lock briefly to snapshot.""" 

203 with self._lock: 

204 data = [ 

205 { 

206 'type': type(m).__name__, 

207 'content': m.content, 

208 'metadata': self._metadata[i], 

209 } 

210 for i, m in enumerate(self._messages) 

211 ] 

212 # Disk I/O outside lock 

213 try: 

214 if not self._dir_ensured: 

215 os.makedirs(os.path.dirname(self._buffer_file), exist_ok=True) 

216 self._dir_ensured = True 

217 with open(self._buffer_file, 'w', encoding='utf-8') as f: 

218 json.dump(data, f, separators=(',', ':')) 

219 except Exception as e: 

220 logger.debug(f"Could not save buffer to {self._buffer_file}: {e}") 

221 

222 def flush_sync(self): 

223 """Force immediate flush (for shutdown / test teardown).""" 

224 if self._flush_timer is not None: 

225 self._flush_timer.cancel() 

226 self._flush_to_disk() 

227 

228 # ── Load ── 

229 

230 def _load_buffer(self): 

231 """Load persisted messages from JSON file and build indexes.""" 

232 if not os.path.exists(self._buffer_file): 

233 return 

234 try: 

235 with open(self._buffer_file, 'r', encoding='utf-8') as f: 

236 data = json.load(f) 

237 _msg_cls = {'AIMessage': AIMessage} 

238 for item in data: 

239 cls = _msg_cls.get(item.get('type'), HumanMessage) 

240 self._messages.append(cls(content=item.get('content', ''))) 

241 meta = item.get('metadata', {}) 

242 self._metadata.append(meta) 

243 self._timestamps.append(meta.get('timestamp', '')) 

244 self._rebuild_indexes_locked() 

245 logger.debug(f"Loaded {len(self._messages)} messages from {self._buffer_file}") 

246 except Exception as e: 

247 logger.debug(f"Could not load buffer from {self._buffer_file}: {e}") 

248 

249 # ── Search: deterministic by metadata + date range ── 

250 

251 def search_by_metadata(self, date_from: str = None, date_to: str = None, 

252 **filters) -> List[Dict]: 

253 """Deterministic search with O(1) index lookup or O(log n) date bisect. 

254 

255 Args: 

256 date_from: ISO8601 start (inclusive). 

257 date_to: ISO8601 end (inclusive). Date-only → end-of-day. 

258 **filters: Key-value pairs matched against metadata. 

259 

260 Fast paths: 

261 - request_Id only → inverted index O(1) 

262 - prompt_id only → inverted index O(1) 

263 - date range only → bisect O(log n) + O(k) slice 

264 - combined → intersect index hits with date bounds 

265 """ 

266 with self._lock: 

267 candidates = self._resolve_candidates(filters, date_from, date_to) 

268 return [ 

269 { 

270 'type': type(self._messages[i]).__name__, 

271 'content': self._messages[i].content, 

272 'metadata': self._metadata[i], 

273 } 

274 for i in candidates 

275 ] 

276 

277 def _resolve_candidates(self, filters: Dict, date_from: str, 

278 date_to: str) -> List[int]: 

279 """Resolve candidate positions using indexes + bisect. Called under lock.""" 

280 n = len(self._messages) 

281 if n == 0: 

282 return [] 

283 

284 # Start with full range 

285 candidate_set: Optional[set] = None 

286 

287 # Fast path: indexed key lookup 

288 req_id = filters.pop('request_Id', None) 

289 if req_id is not None: 

290 hits = self._idx_request.get(str(req_id), []) 

291 candidate_set = set(hits) 

292 

293 prom_id = filters.pop('prompt_id', None) 

294 if prom_id is not None: 

295 hits = self._idx_prompt.get(prom_id, []) 

296 if candidate_set is not None: 

297 candidate_set &= set(hits) 

298 else: 

299 candidate_set = set(hits) 

300 

301 # Date range: bisect on _timestamps (ISO strings sort lexicographically) 

302 lo, hi = 0, n 

303 if date_from: 

304 lo = bisect.bisect_left(self._timestamps, date_from) 

305 if date_to: 

306 # Expand date-only to end-of-day for inclusive comparison 

307 upper = date_to 

308 if 'T' not in date_to: 

309 upper = date_to + 'T23:59:59.999999' 

310 hi = bisect.bisect_right(self._timestamps, upper) 

311 

312 date_set = set(range(lo, hi)) if (date_from or date_to) else None 

313 

314 if date_set is not None: 

315 candidate_set = (candidate_set & date_set) if candidate_set is not None else date_set 

316 

317 # If no indexed filters applied, use full range 

318 if candidate_set is None: 

319 candidate_set = set(range(n)) 

320 

321 # Remaining arbitrary filters (non-indexed keys) 

322 if filters: 

323 candidate_set = { 

324 i for i in candidate_set 

325 if all(self._metadata[i].get(k) == v for k, v in filters.items()) 

326 } 

327 

328 return sorted(candidate_set) 

329 

330 # ── Search: semantic via SimpleMem ── 

331 

332 def semantic_search(self, query: str, max_results: int = 10) -> List[Dict]: 

333 """Semantic search using SimpleMem's adaptive retrieval.""" 

334 if self._simplemem_store is None: 

335 return [] 

336 try: 

337 loop = _get_bg_loop() 

338 future = asyncio.run_coroutine_threadsafe( 

339 self._simplemem_store.search(query, max_results=max_results), loop) 

340 results = future.result(timeout=5.0) 

341 return [{'content': r.content, 'score': r.score} for r in results] 

342 except Exception as e: 

343 logger.warning(f"SimpleMem semantic search failed: {e}") 

344 return [] 

345 

346 

347class SimpleMemChatMemory(BaseChatMemory): 

348 """ 

349 LangChain memory backed by SimpleMem + persistent message buffer. 

350 

351 Zero-latency design: 

352 - load_memory_variables(): returns in-memory list (O(1), no I/O) 

353 - save_context(): in-memory append + deferred disk flush 

354 - search_by_metadata(): O(1) indexed lookup / O(log n) date bisect 

355 - semantic_search(): SimpleMem vector search for FULL_HISTORY tool 

356 """ 

357 

358 memory_key: str = "chat_history" 

359 return_messages: bool = True 

360 input_key: str = "input" 

361 max_buffer_size: int = 8 

362 

363 class Config: 

364 arbitrary_types_allowed = True 

365 

366 @property 

367 def memory_variables(self) -> List[str]: 

368 return [self.memory_key] 

369 

370 def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]: 

371 """Return recent messages — zero latency, pure in-memory.""" 

372 msgs = self.chat_memory.messages 

373 return {self.memory_key: msgs[-self.max_buffer_size:]} 

374 

375 def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str], 

376 metadata: Optional[Dict[str, Any]] = None) -> None: 

377 """Save context with optional metadata for deterministic retrieval. 

378 

379 Extends BaseChatMemory.save_context() to thread metadata through to 

380 the persistent buffer + SimpleMem, enabling search_by_metadata() 

381 lookups by request_Id, prompt_id, date range, or any custom key. 

382 """ 

383 input_str = inputs.get(self.input_key, next(iter(inputs.values()), '')) 

384 output_key = self.output_key or 'output' 

385 output_str = outputs.get(output_key, next(iter(outputs.values()), '')) 

386 meta = metadata or {} 

387 

388 self.chat_memory.add_message( 

389 HumanMessage(content=str(input_str)), metadata=meta) 

390 self.chat_memory.add_message( 

391 AIMessage(content=str(output_str)), metadata=meta) 

392 

393 def search_by_metadata(self, date_from: str = None, date_to: str = None, 

394 **filters) -> List[Dict]: 

395 """Deterministic search by metadata and/or date range. 

396 

397 Usage: 

398 memory.search_by_metadata(request_Id='1771756765') 

399 memory.search_by_metadata(date_from='2026-02-22', date_to='2026-02-23') 

400 memory.search_by_metadata(date_from='2026-02-22T16:00:00', prompt_id=0) 

401 """ 

402 if isinstance(self.chat_memory, PersistentChatHistory): 

403 return self.chat_memory.search_by_metadata( 

404 date_from=date_from, date_to=date_to, **filters) 

405 return [] 

406 

407 def semantic_search(self, query: str, max_results: int = 10) -> List[Dict]: 

408 """Semantic search for FULL_HISTORY tool.""" 

409 if isinstance(self.chat_memory, PersistentChatHistory): 

410 return self.chat_memory.semantic_search(query, max_results) 

411 return [] 

412 

413 @classmethod 

414 def load_or_create(cls, user_id: int, prompt_id: int = None): 

415 """ 

416 Factory: creates memory with persistent buffer + optional SimpleMem. 

417 

418 Args: 

419 user_id: The user ID 

420 prompt_id: Optional prompt ID (unused — memory is per-user like Zep was) 

421 """ 

422 session_id = f"user_{user_id}" 

423 db_path = os.path.join(SIMPLEMEM_DB_ROOT, session_id) 

424 buffer_file = os.path.join(db_path, 'buffer.json') 

425 

426 # Create SimpleMem store if available 

427 simplemem_store = None 

428 if HAS_SIMPLEMEM: 

429 try: 

430 config = SimpleMemConfig.from_env() 

431 config.db_path = db_path 

432 simplemem_store = SimpleMemStore(config) 

433 except Exception as e: 

434 logger.debug(f"SimpleMem init failed for {session_id}: {e}") 

435 

436 chat_history = PersistentChatHistory( 

437 buffer_file=buffer_file, 

438 simplemem_store=simplemem_store, 

439 ) 

440 

441 return cls( 

442 chat_memory=chat_history, 

443 return_messages=True, 

444 input_key="input", 

445 )