Coverage for integrations / channels / memory / memory_store.py: 37.8%

262 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Memory Store using SQLite FTS5 + embeddings for semantic search. 

3 

4Provides persistent storage with full-text search and vector similarity capabilities. 

5""" 

6 

7import hashlib 

8import json 

9import sqlite3 

10import threading 

11import time 

12from dataclasses import dataclass, field 

13from pathlib import Path 

14from typing import Any, Callable, Dict, List, Optional, Tuple, Union 

15 

16 

17@dataclass 

18class MemoryItem: 

19 """Represents a single memory item stored in the memory store.""" 

20 

21 id: str 

22 content: str 

23 metadata: Dict[str, Any] = field(default_factory=dict) 

24 embedding: Optional[List[float]] = None 

25 source: str = "memory" 

26 created_at: float = field(default_factory=time.time) 

27 updated_at: float = field(default_factory=time.time) 

28 hash: str = "" 

29 

30 def __post_init__(self): 

31 if not self.hash: 

32 self.hash = self._compute_hash() 

33 

34 def _compute_hash(self) -> str: 

35 """Compute SHA256 hash of content.""" 

36 return hashlib.sha256(self.content.encode('utf-8')).hexdigest() 

37 

38 def to_dict(self) -> Dict[str, Any]: 

39 """Convert to dictionary representation.""" 

40 return { 

41 'id': self.id, 

42 'content': self.content, 

43 'metadata': self.metadata, 

44 'embedding': self.embedding, 

45 'source': self.source, 

46 'created_at': self.created_at, 

47 'updated_at': self.updated_at, 

48 'hash': self.hash, 

49 } 

50 

51 @classmethod 

52 def from_dict(cls, data: Dict[str, Any]) -> 'MemoryItem': 

53 """Create from dictionary representation.""" 

54 return cls( 

55 id=data['id'], 

56 content=data['content'], 

57 metadata=data.get('metadata', {}), 

58 embedding=data.get('embedding'), 

59 source=data.get('source', 'memory'), 

60 created_at=data.get('created_at', time.time()), 

61 updated_at=data.get('updated_at', time.time()), 

62 hash=data.get('hash', ''), 

63 ) 

64 

65 

66@dataclass 

67class SearchResult: 

68 """Represents a search result from the memory store.""" 

69 

70 item: MemoryItem 

71 score: float 

72 match_type: str = "fts" # "fts", "semantic", or "hybrid" 

73 snippet: str = "" 

74 

75 

76class MemoryStore: 

77 """ 

78 Memory store with SQLite FTS5 for full-text search and embedding support 

79 for semantic search. 

80 

81 Features: 

82 - Full-text search using SQLite FTS5 

83 - Semantic search using embeddings with cosine similarity 

84 - Hybrid search combining FTS5 and semantic scores 

85 - Thread-safe operations 

86 - Automatic schema management 

87 """ 

88 

89 SCHEMA_VERSION = 1 

90 

91 def __init__( 

92 self, 

93 db_path: Optional[Union[str, Path]] = None, 

94 embedding_fn: Optional[Callable[[str], List[float]]] = None, 

95 embedding_dims: int = 384, 

96 ): 

97 """ 

98 Initialize the memory store. 

99 

100 Args: 

101 db_path: Path to SQLite database file. Uses in-memory if None. 

102 embedding_fn: Optional function to compute embeddings. 

103 embedding_dims: Dimension of embedding vectors. 

104 """ 

105 self.db_path = str(db_path) if db_path else ":memory:" 

106 self.embedding_fn = embedding_fn 

107 self.embedding_dims = embedding_dims 

108 self._lock = threading.RLock() 

109 self._conn: Optional[sqlite3.Connection] = None 

110 self._ensure_connection() 

111 self._ensure_schema() 

112 

113 def _ensure_connection(self) -> sqlite3.Connection: 

114 """Ensure database connection is open.""" 

115 if self._conn is None: 

116 self._conn = sqlite3.connect( 

117 self.db_path, 

118 check_same_thread=False, 

119 isolation_level=None, 

120 ) 

121 self._conn.row_factory = sqlite3.Row 

122 # Enable FTS5 if available 

123 try: 

124 self._conn.execute("SELECT fts5()") 

125 except sqlite3.OperationalError: 

126 pass # FTS5 not available, will use fallback 

127 return self._conn 

128 

129 def _ensure_schema(self): 

130 """Create database schema if not exists.""" 

131 conn = self._ensure_connection() 

132 with self._lock: 

133 # Main memory items table 

134 conn.execute(""" 

135 CREATE TABLE IF NOT EXISTS memory_items ( 

136 id TEXT PRIMARY KEY, 

137 content TEXT NOT NULL, 

138 metadata TEXT DEFAULT '{}', 

139 embedding TEXT, 

140 source TEXT DEFAULT 'memory', 

141 created_at REAL, 

142 updated_at REAL, 

143 hash TEXT 

144 ) 

145 """) 

146 

147 # Create FTS5 virtual table for full-text search 

148 try: 

149 conn.execute(""" 

150 CREATE VIRTUAL TABLE IF NOT EXISTS memory_fts USING fts5( 

151 content, 

152 id UNINDEXED, 

153 source UNINDEXED, 

154 content=memory_items, 

155 content_rowid=rowid 

156 ) 

157 """) 

158 self._fts_available = True 

159 except sqlite3.OperationalError: 

160 # FTS5 not available, create fallback table 

161 conn.execute(""" 

162 CREATE TABLE IF NOT EXISTS memory_fts_fallback ( 

163 rowid INTEGER PRIMARY KEY, 

164 content TEXT, 

165 id TEXT, 

166 source TEXT 

167 ) 

168 """) 

169 self._fts_available = False 

170 

171 # Create triggers for FTS sync 

172 if self._fts_available: 

173 conn.executescript(""" 

174 CREATE TRIGGER IF NOT EXISTS memory_items_ai AFTER INSERT ON memory_items BEGIN 

175 INSERT INTO memory_fts(rowid, content, id, source) 

176 VALUES (NEW.rowid, NEW.content, NEW.id, NEW.source); 

177 END; 

178 

179 CREATE TRIGGER IF NOT EXISTS memory_items_ad AFTER DELETE ON memory_items BEGIN 

180 INSERT INTO memory_fts(memory_fts, rowid, content, id, source) 

181 VALUES('delete', OLD.rowid, OLD.content, OLD.id, OLD.source); 

182 END; 

183 

184 CREATE TRIGGER IF NOT EXISTS memory_items_au AFTER UPDATE ON memory_items BEGIN 

185 INSERT INTO memory_fts(memory_fts, rowid, content, id, source) 

186 VALUES('delete', OLD.rowid, OLD.content, OLD.id, OLD.source); 

187 INSERT INTO memory_fts(rowid, content, id, source) 

188 VALUES (NEW.rowid, NEW.content, NEW.id, NEW.source); 

189 END; 

190 """) 

191 

192 # Create indexes 

193 conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_source ON memory_items(source)") 

194 conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_hash ON memory_items(hash)") 

195 conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_updated ON memory_items(updated_at)") 

196 

197 # Schema version table 

198 conn.execute(""" 

199 CREATE TABLE IF NOT EXISTS schema_meta ( 

200 key TEXT PRIMARY KEY, 

201 value TEXT 

202 ) 

203 """) 

204 conn.execute( 

205 "INSERT OR REPLACE INTO schema_meta (key, value) VALUES (?, ?)", 

206 ('version', str(self.SCHEMA_VERSION)) 

207 ) 

208 

209 def add( 

210 self, 

211 content: str, 

212 metadata: Optional[Dict[str, Any]] = None, 

213 source: str = "memory", 

214 item_id: Optional[str] = None, 

215 compute_embedding: bool = True, 

216 ) -> MemoryItem: 

217 """ 

218 Add a new memory item to the store. 

219 

220 Args: 

221 content: The content to store. 

222 metadata: Optional metadata dictionary. 

223 source: Source identifier for the memory. 

224 item_id: Optional custom ID. Auto-generated if not provided. 

225 compute_embedding: Whether to compute embedding for semantic search. 

226 

227 Returns: 

228 The created MemoryItem. 

229 """ 

230 now = time.time() 

231 item_id = item_id or hashlib.sha256( 

232 f"{content}:{now}".encode('utf-8') 

233 ).hexdigest()[:16] 

234 

235 embedding = None 

236 if compute_embedding and self.embedding_fn: 

237 try: 

238 embedding = self.embedding_fn(content) 

239 except Exception: 

240 pass 

241 

242 item = MemoryItem( 

243 id=item_id, 

244 content=content, 

245 metadata=metadata or {}, 

246 embedding=embedding, 

247 source=source, 

248 created_at=now, 

249 updated_at=now, 

250 ) 

251 

252 conn = self._ensure_connection() 

253 with self._lock: 

254 conn.execute( 

255 """ 

256 INSERT OR REPLACE INTO memory_items 

257 (id, content, metadata, embedding, source, created_at, updated_at, hash) 

258 VALUES (?, ?, ?, ?, ?, ?, ?, ?) 

259 """, 

260 ( 

261 item.id, 

262 item.content, 

263 json.dumps(item.metadata), 

264 json.dumps(item.embedding) if item.embedding else None, 

265 item.source, 

266 item.created_at, 

267 item.updated_at, 

268 item.hash, 

269 ) 

270 ) 

271 

272 # Broadcast memory addition to EventBus 

273 try: 

274 from core.platform.events import emit_event 

275 emit_event('memory.item_added', { 

276 'id': item.id, 'source': item.source, 

277 'content_length': len(item.content), 

278 }) 

279 except Exception: 

280 pass 

281 

282 return item 

283 

284 def add_batch( 

285 self, 

286 items: List[Tuple[str, Optional[Dict[str, Any]]]], 

287 source: str = "memory", 

288 compute_embedding: bool = True, 

289 ) -> List[MemoryItem]: 

290 """ 

291 Add multiple memory items in a batch. 

292 

293 Args: 

294 items: List of (content, metadata) tuples. 

295 source: Source identifier for all items. 

296 compute_embedding: Whether to compute embeddings. 

297 

298 Returns: 

299 List of created MemoryItems. 

300 """ 

301 results = [] 

302 conn = self._ensure_connection() 

303 

304 with self._lock: 

305 conn.execute("BEGIN") 

306 try: 

307 for content, metadata in items: 

308 item = self.add( 

309 content=content, 

310 metadata=metadata, 

311 source=source, 

312 compute_embedding=compute_embedding, 

313 ) 

314 results.append(item) 

315 conn.execute("COMMIT") 

316 except Exception: 

317 conn.execute("ROLLBACK") 

318 raise 

319 

320 return results 

321 

322 def get(self, item_id: str) -> Optional[MemoryItem]: 

323 """ 

324 Get a memory item by ID. 

325 

326 Args: 

327 item_id: The item ID. 

328 

329 Returns: 

330 The MemoryItem if found, None otherwise. 

331 """ 

332 conn = self._ensure_connection() 

333 with self._lock: 

334 row = conn.execute( 

335 "SELECT * FROM memory_items WHERE id = ?", 

336 (item_id,) 

337 ).fetchone() 

338 

339 if not row: 

340 return None 

341 

342 return self._row_to_item(row) 

343 

344 def search( 

345 self, 

346 query: str, 

347 max_results: int = 10, 

348 min_score: float = 0.0, 

349 source_filter: Optional[str] = None, 

350 ) -> List[SearchResult]: 

351 """ 

352 Search memories using FTS5 full-text search. 

353 

354 Args: 

355 query: The search query. 

356 max_results: Maximum number of results. 

357 min_score: Minimum score threshold. 

358 source_filter: Optional source to filter by. 

359 

360 Returns: 

361 List of SearchResult objects sorted by score. 

362 """ 

363 if not query.strip(): 

364 return [] 

365 

366 conn = self._ensure_connection() 

367 results = [] 

368 

369 with self._lock: 

370 if self._fts_available: 

371 # Build FTS5 query 

372 fts_query = self._build_fts_query(query) 

373 if not fts_query: 

374 return [] 

375 

376 sql = """ 

377 SELECT m.*, bm25(memory_fts) as score, 

378 snippet(memory_fts, 0, '<b>', '</b>', '...', 32) as snippet 

379 FROM memory_fts f 

380 JOIN memory_items m ON f.id = m.id 

381 WHERE memory_fts MATCH ? 

382 """ 

383 params: List[Any] = [fts_query] 

384 

385 if source_filter: 

386 sql += " AND m.source = ?" 

387 params.append(source_filter) 

388 

389 sql += " ORDER BY score LIMIT ?" 

390 params.append(max_results * 2) # Get extra for filtering 

391 

392 rows = conn.execute(sql, params).fetchall() 

393 else: 

394 # Fallback to LIKE search 

395 sql = """ 

396 SELECT *, 1.0 as score, '' as snippet 

397 FROM memory_items 

398 WHERE content LIKE ? 

399 """ 

400 params = [f"%{query}%"] 

401 

402 if source_filter: 

403 sql += " AND source = ?" 

404 params.append(source_filter) 

405 

406 sql += " LIMIT ?" 

407 params.append(max_results * 2) 

408 

409 rows = conn.execute(sql, params).fetchall() 

410 

411 for row in rows: 

412 # Convert BM25 rank to score (higher is better) 

413 raw_score = abs(row['score']) if row['score'] else 0 

414 score = self._bm25_to_score(raw_score) 

415 

416 if score >= min_score: 

417 item = self._row_to_item(row) 

418 results.append(SearchResult( 

419 item=item, 

420 score=score, 

421 match_type="fts", 

422 snippet=row['snippet'] if row['snippet'] else "", 

423 )) 

424 

425 return sorted(results, key=lambda x: x.score, reverse=True)[:max_results] 

426 

427 def search_semantic( 

428 self, 

429 query: str, 

430 max_results: int = 10, 

431 min_score: float = 0.0, 

432 source_filter: Optional[str] = None, 

433 ) -> List[SearchResult]: 

434 """ 

435 Search memories using semantic similarity with embeddings. 

436 

437 Args: 

438 query: The search query. 

439 max_results: Maximum number of results. 

440 min_score: Minimum similarity score threshold. 

441 source_filter: Optional source to filter by. 

442 

443 Returns: 

444 List of SearchResult objects sorted by similarity score. 

445 """ 

446 if not query.strip() or not self.embedding_fn: 

447 return [] 

448 

449 try: 

450 query_embedding = self.embedding_fn(query) 

451 except Exception: 

452 return [] 

453 

454 conn = self._ensure_connection() 

455 results = [] 

456 

457 with self._lock: 

458 sql = "SELECT * FROM memory_items WHERE embedding IS NOT NULL" 

459 params: List[Any] = [] 

460 

461 if source_filter: 

462 sql += " AND source = ?" 

463 params.append(source_filter) 

464 

465 rows = conn.execute(sql, params).fetchall() 

466 

467 for row in rows: 

468 item = self._row_to_item(row) 

469 if item.embedding: 

470 score = self._cosine_similarity(query_embedding, item.embedding) 

471 if score >= min_score: 

472 snippet = item.content[:200] + "..." if len(item.content) > 200 else item.content 

473 results.append(SearchResult( 

474 item=item, 

475 score=score, 

476 match_type="semantic", 

477 snippet=snippet, 

478 )) 

479 

480 return sorted(results, key=lambda x: x.score, reverse=True)[:max_results] 

481 

482 def search_hybrid( 

483 self, 

484 query: str, 

485 max_results: int = 10, 

486 min_score: float = 0.0, 

487 source_filter: Optional[str] = None, 

488 fts_weight: float = 0.3, 

489 semantic_weight: float = 0.7, 

490 ) -> List[SearchResult]: 

491 """ 

492 Hybrid search combining FTS5 and semantic search. 

493 

494 Args: 

495 query: The search query. 

496 max_results: Maximum number of results. 

497 min_score: Minimum combined score threshold. 

498 source_filter: Optional source to filter by. 

499 fts_weight: Weight for FTS5 scores (0-1). 

500 semantic_weight: Weight for semantic scores (0-1). 

501 

502 Returns: 

503 List of SearchResult objects sorted by combined score. 

504 """ 

505 # Get results from both search methods 

506 candidates = max_results * 3 

507 fts_results = self.search(query, candidates, 0.0, source_filter) 

508 semantic_results = self.search_semantic(query, candidates, 0.0, source_filter) 

509 

510 # Merge results 

511 scores_by_id: Dict[str, Dict[str, Any]] = {} 

512 

513 for result in fts_results: 

514 scores_by_id[result.item.id] = { 

515 'item': result.item, 

516 'fts_score': result.score, 

517 'semantic_score': 0.0, 

518 'snippet': result.snippet, 

519 } 

520 

521 for result in semantic_results: 

522 if result.item.id in scores_by_id: 

523 scores_by_id[result.item.id]['semantic_score'] = result.score 

524 if result.snippet and not scores_by_id[result.item.id]['snippet']: 

525 scores_by_id[result.item.id]['snippet'] = result.snippet 

526 else: 

527 scores_by_id[result.item.id] = { 

528 'item': result.item, 

529 'fts_score': 0.0, 

530 'semantic_score': result.score, 

531 'snippet': result.snippet, 

532 } 

533 

534 # Compute combined scores 

535 results = [] 

536 for data in scores_by_id.values(): 

537 combined_score = ( 

538 fts_weight * data['fts_score'] + 

539 semantic_weight * data['semantic_score'] 

540 ) 

541 if combined_score >= min_score: 

542 results.append(SearchResult( 

543 item=data['item'], 

544 score=combined_score, 

545 match_type="hybrid", 

546 snippet=data['snippet'], 

547 )) 

548 

549 return sorted(results, key=lambda x: x.score, reverse=True)[:max_results] 

550 

551 def delete(self, item_id: str) -> bool: 

552 """ 

553 Delete a memory item by ID. 

554 

555 Args: 

556 item_id: The item ID to delete. 

557 

558 Returns: 

559 True if item was deleted, False if not found. 

560 """ 

561 conn = self._ensure_connection() 

562 with self._lock: 

563 cursor = conn.execute( 

564 "DELETE FROM memory_items WHERE id = ?", 

565 (item_id,) 

566 ) 

567 deleted = cursor.rowcount > 0 

568 

569 if deleted: 

570 try: 

571 from core.platform.events import emit_event 

572 emit_event('memory.item_deleted', {'id': item_id}) 

573 except Exception: 

574 pass 

575 

576 return deleted 

577 

578 def delete_by_source(self, source: str) -> int: 

579 """ 

580 Delete all memory items from a specific source. 

581 

582 Args: 

583 source: The source identifier. 

584 

585 Returns: 

586 Number of items deleted. 

587 """ 

588 conn = self._ensure_connection() 

589 with self._lock: 

590 cursor = conn.execute( 

591 "DELETE FROM memory_items WHERE source = ?", 

592 (source,) 

593 ) 

594 return cursor.rowcount 

595 

596 def clear(self) -> int: 

597 """ 

598 Clear all memory items from the store. 

599 

600 Returns: 

601 Number of items deleted. 

602 """ 

603 conn = self._ensure_connection() 

604 with self._lock: 

605 cursor = conn.execute("DELETE FROM memory_items") 

606 return cursor.rowcount 

607 

608 def count(self, source: Optional[str] = None) -> int: 

609 """ 

610 Count memory items in the store. 

611 

612 Args: 

613 source: Optional source to filter by. 

614 

615 Returns: 

616 Number of items. 

617 """ 

618 conn = self._ensure_connection() 

619 with self._lock: 

620 if source: 

621 row = conn.execute( 

622 "SELECT COUNT(*) as cnt FROM memory_items WHERE source = ?", 

623 (source,) 

624 ).fetchone() 

625 else: 

626 row = conn.execute("SELECT COUNT(*) as cnt FROM memory_items").fetchone() 

627 return row['cnt'] if row else 0 

628 

629 def list_sources(self) -> List[str]: 

630 """ 

631 List all unique sources in the store. 

632 

633 Returns: 

634 List of source identifiers. 

635 """ 

636 conn = self._ensure_connection() 

637 with self._lock: 

638 rows = conn.execute( 

639 "SELECT DISTINCT source FROM memory_items ORDER BY source" 

640 ).fetchall() 

641 return [row['source'] for row in rows] 

642 

643 def update_embedding(self, item_id: str) -> bool: 

644 """ 

645 Recompute and update the embedding for an item. 

646 

647 Args: 

648 item_id: The item ID. 

649 

650 Returns: 

651 True if updated, False if item not found or no embedding function. 

652 """ 

653 if not self.embedding_fn: 

654 return False 

655 

656 item = self.get(item_id) 

657 if not item: 

658 return False 

659 

660 try: 

661 embedding = self.embedding_fn(item.content) 

662 except Exception: 

663 return False 

664 

665 conn = self._ensure_connection() 

666 with self._lock: 

667 conn.execute( 

668 "UPDATE memory_items SET embedding = ?, updated_at = ? WHERE id = ?", 

669 (json.dumps(embedding), time.time(), item_id) 

670 ) 

671 

672 return True 

673 

674 def close(self): 

675 """Close the database connection.""" 

676 with self._lock: 

677 if self._conn: 

678 self._conn.close() 

679 self._conn = None 

680 

681 def _row_to_item(self, row: sqlite3.Row) -> MemoryItem: 

682 """Convert a database row to a MemoryItem.""" 

683 embedding = None 

684 if row['embedding']: 

685 try: 

686 embedding = json.loads(row['embedding']) 

687 except (json.JSONDecodeError, TypeError): 

688 pass 

689 

690 metadata = {} 

691 if row['metadata']: 

692 try: 

693 metadata = json.loads(row['metadata']) 

694 except (json.JSONDecodeError, TypeError): 

695 pass 

696 

697 return MemoryItem( 

698 id=row['id'], 

699 content=row['content'], 

700 metadata=metadata, 

701 embedding=embedding, 

702 source=row['source'] or 'memory', 

703 created_at=row['created_at'] or time.time(), 

704 updated_at=row['updated_at'] or time.time(), 

705 hash=row['hash'] or '', 

706 ) 

707 

708 def _build_fts_query(self, raw: str) -> Optional[str]: 

709 """Build an FTS5 query from raw input.""" 

710 import re 

711 tokens = re.findall(r'[A-Za-z0-9_]+', raw) 

712 if not tokens: 

713 return None 

714 # Quote each token and join with AND 

715 quoted = [f'"{t}"' for t in tokens if t] 

716 return " AND ".join(quoted) 

717 

718 def _bm25_to_score(self, rank: float) -> float: 

719 """Convert BM25 rank to a 0-1 score (higher is better).""" 

720 # BM25 returns negative scores, more negative = better match 

721 normalized = max(0, rank) if rank >= 0 else abs(rank) 

722 return 1 / (1 + normalized) if normalized > 0 else 1.0 

723 

724 def _cosine_similarity(self, a: List[float], b: List[float]) -> float: 

725 """Compute cosine similarity between two vectors.""" 

726 if not a or not b: 

727 return 0.0 

728 

729 length = min(len(a), len(b)) 

730 dot = sum(a[i] * b[i] for i in range(length)) 

731 norm_a = sum(x * x for x in a[:length]) ** 0.5 

732 norm_b = sum(x * x for x in b[:length]) ** 0.5 

733 

734 if norm_a == 0 or norm_b == 0: 

735 return 0.0 

736 

737 return dot / (norm_a * norm_b) 

738 

739 def __enter__(self): 

740 return self 

741 

742 def __exit__(self, exc_type, exc_val, exc_tb): 

743 self.close() 

744 return False