Coverage for integrations / channels / memory / memory_store.py: 37.8%
262 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Memory Store using SQLite FTS5 + embeddings for semantic search.
4Provides persistent storage with full-text search and vector similarity capabilities.
5"""
7import hashlib
8import json
9import sqlite3
10import threading
11import time
12from dataclasses import dataclass, field
13from pathlib import Path
14from typing import Any, Callable, Dict, List, Optional, Tuple, Union
17@dataclass
18class MemoryItem:
19 """Represents a single memory item stored in the memory store."""
21 id: str
22 content: str
23 metadata: Dict[str, Any] = field(default_factory=dict)
24 embedding: Optional[List[float]] = None
25 source: str = "memory"
26 created_at: float = field(default_factory=time.time)
27 updated_at: float = field(default_factory=time.time)
28 hash: str = ""
30 def __post_init__(self):
31 if not self.hash:
32 self.hash = self._compute_hash()
34 def _compute_hash(self) -> str:
35 """Compute SHA256 hash of content."""
36 return hashlib.sha256(self.content.encode('utf-8')).hexdigest()
38 def to_dict(self) -> Dict[str, Any]:
39 """Convert to dictionary representation."""
40 return {
41 'id': self.id,
42 'content': self.content,
43 'metadata': self.metadata,
44 'embedding': self.embedding,
45 'source': self.source,
46 'created_at': self.created_at,
47 'updated_at': self.updated_at,
48 'hash': self.hash,
49 }
51 @classmethod
52 def from_dict(cls, data: Dict[str, Any]) -> 'MemoryItem':
53 """Create from dictionary representation."""
54 return cls(
55 id=data['id'],
56 content=data['content'],
57 metadata=data.get('metadata', {}),
58 embedding=data.get('embedding'),
59 source=data.get('source', 'memory'),
60 created_at=data.get('created_at', time.time()),
61 updated_at=data.get('updated_at', time.time()),
62 hash=data.get('hash', ''),
63 )
66@dataclass
67class SearchResult:
68 """Represents a search result from the memory store."""
70 item: MemoryItem
71 score: float
72 match_type: str = "fts" # "fts", "semantic", or "hybrid"
73 snippet: str = ""
76class MemoryStore:
77 """
78 Memory store with SQLite FTS5 for full-text search and embedding support
79 for semantic search.
81 Features:
82 - Full-text search using SQLite FTS5
83 - Semantic search using embeddings with cosine similarity
84 - Hybrid search combining FTS5 and semantic scores
85 - Thread-safe operations
86 - Automatic schema management
87 """
89 SCHEMA_VERSION = 1
91 def __init__(
92 self,
93 db_path: Optional[Union[str, Path]] = None,
94 embedding_fn: Optional[Callable[[str], List[float]]] = None,
95 embedding_dims: int = 384,
96 ):
97 """
98 Initialize the memory store.
100 Args:
101 db_path: Path to SQLite database file. Uses in-memory if None.
102 embedding_fn: Optional function to compute embeddings.
103 embedding_dims: Dimension of embedding vectors.
104 """
105 self.db_path = str(db_path) if db_path else ":memory:"
106 self.embedding_fn = embedding_fn
107 self.embedding_dims = embedding_dims
108 self._lock = threading.RLock()
109 self._conn: Optional[sqlite3.Connection] = None
110 self._ensure_connection()
111 self._ensure_schema()
113 def _ensure_connection(self) -> sqlite3.Connection:
114 """Ensure database connection is open."""
115 if self._conn is None:
116 self._conn = sqlite3.connect(
117 self.db_path,
118 check_same_thread=False,
119 isolation_level=None,
120 )
121 self._conn.row_factory = sqlite3.Row
122 # Enable FTS5 if available
123 try:
124 self._conn.execute("SELECT fts5()")
125 except sqlite3.OperationalError:
126 pass # FTS5 not available, will use fallback
127 return self._conn
129 def _ensure_schema(self):
130 """Create database schema if not exists."""
131 conn = self._ensure_connection()
132 with self._lock:
133 # Main memory items table
134 conn.execute("""
135 CREATE TABLE IF NOT EXISTS memory_items (
136 id TEXT PRIMARY KEY,
137 content TEXT NOT NULL,
138 metadata TEXT DEFAULT '{}',
139 embedding TEXT,
140 source TEXT DEFAULT 'memory',
141 created_at REAL,
142 updated_at REAL,
143 hash TEXT
144 )
145 """)
147 # Create FTS5 virtual table for full-text search
148 try:
149 conn.execute("""
150 CREATE VIRTUAL TABLE IF NOT EXISTS memory_fts USING fts5(
151 content,
152 id UNINDEXED,
153 source UNINDEXED,
154 content=memory_items,
155 content_rowid=rowid
156 )
157 """)
158 self._fts_available = True
159 except sqlite3.OperationalError:
160 # FTS5 not available, create fallback table
161 conn.execute("""
162 CREATE TABLE IF NOT EXISTS memory_fts_fallback (
163 rowid INTEGER PRIMARY KEY,
164 content TEXT,
165 id TEXT,
166 source TEXT
167 )
168 """)
169 self._fts_available = False
171 # Create triggers for FTS sync
172 if self._fts_available:
173 conn.executescript("""
174 CREATE TRIGGER IF NOT EXISTS memory_items_ai AFTER INSERT ON memory_items BEGIN
175 INSERT INTO memory_fts(rowid, content, id, source)
176 VALUES (NEW.rowid, NEW.content, NEW.id, NEW.source);
177 END;
179 CREATE TRIGGER IF NOT EXISTS memory_items_ad AFTER DELETE ON memory_items BEGIN
180 INSERT INTO memory_fts(memory_fts, rowid, content, id, source)
181 VALUES('delete', OLD.rowid, OLD.content, OLD.id, OLD.source);
182 END;
184 CREATE TRIGGER IF NOT EXISTS memory_items_au AFTER UPDATE ON memory_items BEGIN
185 INSERT INTO memory_fts(memory_fts, rowid, content, id, source)
186 VALUES('delete', OLD.rowid, OLD.content, OLD.id, OLD.source);
187 INSERT INTO memory_fts(rowid, content, id, source)
188 VALUES (NEW.rowid, NEW.content, NEW.id, NEW.source);
189 END;
190 """)
192 # Create indexes
193 conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_source ON memory_items(source)")
194 conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_hash ON memory_items(hash)")
195 conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_updated ON memory_items(updated_at)")
197 # Schema version table
198 conn.execute("""
199 CREATE TABLE IF NOT EXISTS schema_meta (
200 key TEXT PRIMARY KEY,
201 value TEXT
202 )
203 """)
204 conn.execute(
205 "INSERT OR REPLACE INTO schema_meta (key, value) VALUES (?, ?)",
206 ('version', str(self.SCHEMA_VERSION))
207 )
209 def add(
210 self,
211 content: str,
212 metadata: Optional[Dict[str, Any]] = None,
213 source: str = "memory",
214 item_id: Optional[str] = None,
215 compute_embedding: bool = True,
216 ) -> MemoryItem:
217 """
218 Add a new memory item to the store.
220 Args:
221 content: The content to store.
222 metadata: Optional metadata dictionary.
223 source: Source identifier for the memory.
224 item_id: Optional custom ID. Auto-generated if not provided.
225 compute_embedding: Whether to compute embedding for semantic search.
227 Returns:
228 The created MemoryItem.
229 """
230 now = time.time()
231 item_id = item_id or hashlib.sha256(
232 f"{content}:{now}".encode('utf-8')
233 ).hexdigest()[:16]
235 embedding = None
236 if compute_embedding and self.embedding_fn:
237 try:
238 embedding = self.embedding_fn(content)
239 except Exception:
240 pass
242 item = MemoryItem(
243 id=item_id,
244 content=content,
245 metadata=metadata or {},
246 embedding=embedding,
247 source=source,
248 created_at=now,
249 updated_at=now,
250 )
252 conn = self._ensure_connection()
253 with self._lock:
254 conn.execute(
255 """
256 INSERT OR REPLACE INTO memory_items
257 (id, content, metadata, embedding, source, created_at, updated_at, hash)
258 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
259 """,
260 (
261 item.id,
262 item.content,
263 json.dumps(item.metadata),
264 json.dumps(item.embedding) if item.embedding else None,
265 item.source,
266 item.created_at,
267 item.updated_at,
268 item.hash,
269 )
270 )
272 # Broadcast memory addition to EventBus
273 try:
274 from core.platform.events import emit_event
275 emit_event('memory.item_added', {
276 'id': item.id, 'source': item.source,
277 'content_length': len(item.content),
278 })
279 except Exception:
280 pass
282 return item
284 def add_batch(
285 self,
286 items: List[Tuple[str, Optional[Dict[str, Any]]]],
287 source: str = "memory",
288 compute_embedding: bool = True,
289 ) -> List[MemoryItem]:
290 """
291 Add multiple memory items in a batch.
293 Args:
294 items: List of (content, metadata) tuples.
295 source: Source identifier for all items.
296 compute_embedding: Whether to compute embeddings.
298 Returns:
299 List of created MemoryItems.
300 """
301 results = []
302 conn = self._ensure_connection()
304 with self._lock:
305 conn.execute("BEGIN")
306 try:
307 for content, metadata in items:
308 item = self.add(
309 content=content,
310 metadata=metadata,
311 source=source,
312 compute_embedding=compute_embedding,
313 )
314 results.append(item)
315 conn.execute("COMMIT")
316 except Exception:
317 conn.execute("ROLLBACK")
318 raise
320 return results
322 def get(self, item_id: str) -> Optional[MemoryItem]:
323 """
324 Get a memory item by ID.
326 Args:
327 item_id: The item ID.
329 Returns:
330 The MemoryItem if found, None otherwise.
331 """
332 conn = self._ensure_connection()
333 with self._lock:
334 row = conn.execute(
335 "SELECT * FROM memory_items WHERE id = ?",
336 (item_id,)
337 ).fetchone()
339 if not row:
340 return None
342 return self._row_to_item(row)
344 def search(
345 self,
346 query: str,
347 max_results: int = 10,
348 min_score: float = 0.0,
349 source_filter: Optional[str] = None,
350 ) -> List[SearchResult]:
351 """
352 Search memories using FTS5 full-text search.
354 Args:
355 query: The search query.
356 max_results: Maximum number of results.
357 min_score: Minimum score threshold.
358 source_filter: Optional source to filter by.
360 Returns:
361 List of SearchResult objects sorted by score.
362 """
363 if not query.strip():
364 return []
366 conn = self._ensure_connection()
367 results = []
369 with self._lock:
370 if self._fts_available:
371 # Build FTS5 query
372 fts_query = self._build_fts_query(query)
373 if not fts_query:
374 return []
376 sql = """
377 SELECT m.*, bm25(memory_fts) as score,
378 snippet(memory_fts, 0, '<b>', '</b>', '...', 32) as snippet
379 FROM memory_fts f
380 JOIN memory_items m ON f.id = m.id
381 WHERE memory_fts MATCH ?
382 """
383 params: List[Any] = [fts_query]
385 if source_filter:
386 sql += " AND m.source = ?"
387 params.append(source_filter)
389 sql += " ORDER BY score LIMIT ?"
390 params.append(max_results * 2) # Get extra for filtering
392 rows = conn.execute(sql, params).fetchall()
393 else:
394 # Fallback to LIKE search
395 sql = """
396 SELECT *, 1.0 as score, '' as snippet
397 FROM memory_items
398 WHERE content LIKE ?
399 """
400 params = [f"%{query}%"]
402 if source_filter:
403 sql += " AND source = ?"
404 params.append(source_filter)
406 sql += " LIMIT ?"
407 params.append(max_results * 2)
409 rows = conn.execute(sql, params).fetchall()
411 for row in rows:
412 # Convert BM25 rank to score (higher is better)
413 raw_score = abs(row['score']) if row['score'] else 0
414 score = self._bm25_to_score(raw_score)
416 if score >= min_score:
417 item = self._row_to_item(row)
418 results.append(SearchResult(
419 item=item,
420 score=score,
421 match_type="fts",
422 snippet=row['snippet'] if row['snippet'] else "",
423 ))
425 return sorted(results, key=lambda x: x.score, reverse=True)[:max_results]
427 def search_semantic(
428 self,
429 query: str,
430 max_results: int = 10,
431 min_score: float = 0.0,
432 source_filter: Optional[str] = None,
433 ) -> List[SearchResult]:
434 """
435 Search memories using semantic similarity with embeddings.
437 Args:
438 query: The search query.
439 max_results: Maximum number of results.
440 min_score: Minimum similarity score threshold.
441 source_filter: Optional source to filter by.
443 Returns:
444 List of SearchResult objects sorted by similarity score.
445 """
446 if not query.strip() or not self.embedding_fn:
447 return []
449 try:
450 query_embedding = self.embedding_fn(query)
451 except Exception:
452 return []
454 conn = self._ensure_connection()
455 results = []
457 with self._lock:
458 sql = "SELECT * FROM memory_items WHERE embedding IS NOT NULL"
459 params: List[Any] = []
461 if source_filter:
462 sql += " AND source = ?"
463 params.append(source_filter)
465 rows = conn.execute(sql, params).fetchall()
467 for row in rows:
468 item = self._row_to_item(row)
469 if item.embedding:
470 score = self._cosine_similarity(query_embedding, item.embedding)
471 if score >= min_score:
472 snippet = item.content[:200] + "..." if len(item.content) > 200 else item.content
473 results.append(SearchResult(
474 item=item,
475 score=score,
476 match_type="semantic",
477 snippet=snippet,
478 ))
480 return sorted(results, key=lambda x: x.score, reverse=True)[:max_results]
482 def search_hybrid(
483 self,
484 query: str,
485 max_results: int = 10,
486 min_score: float = 0.0,
487 source_filter: Optional[str] = None,
488 fts_weight: float = 0.3,
489 semantic_weight: float = 0.7,
490 ) -> List[SearchResult]:
491 """
492 Hybrid search combining FTS5 and semantic search.
494 Args:
495 query: The search query.
496 max_results: Maximum number of results.
497 min_score: Minimum combined score threshold.
498 source_filter: Optional source to filter by.
499 fts_weight: Weight for FTS5 scores (0-1).
500 semantic_weight: Weight for semantic scores (0-1).
502 Returns:
503 List of SearchResult objects sorted by combined score.
504 """
505 # Get results from both search methods
506 candidates = max_results * 3
507 fts_results = self.search(query, candidates, 0.0, source_filter)
508 semantic_results = self.search_semantic(query, candidates, 0.0, source_filter)
510 # Merge results
511 scores_by_id: Dict[str, Dict[str, Any]] = {}
513 for result in fts_results:
514 scores_by_id[result.item.id] = {
515 'item': result.item,
516 'fts_score': result.score,
517 'semantic_score': 0.0,
518 'snippet': result.snippet,
519 }
521 for result in semantic_results:
522 if result.item.id in scores_by_id:
523 scores_by_id[result.item.id]['semantic_score'] = result.score
524 if result.snippet and not scores_by_id[result.item.id]['snippet']:
525 scores_by_id[result.item.id]['snippet'] = result.snippet
526 else:
527 scores_by_id[result.item.id] = {
528 'item': result.item,
529 'fts_score': 0.0,
530 'semantic_score': result.score,
531 'snippet': result.snippet,
532 }
534 # Compute combined scores
535 results = []
536 for data in scores_by_id.values():
537 combined_score = (
538 fts_weight * data['fts_score'] +
539 semantic_weight * data['semantic_score']
540 )
541 if combined_score >= min_score:
542 results.append(SearchResult(
543 item=data['item'],
544 score=combined_score,
545 match_type="hybrid",
546 snippet=data['snippet'],
547 ))
549 return sorted(results, key=lambda x: x.score, reverse=True)[:max_results]
551 def delete(self, item_id: str) -> bool:
552 """
553 Delete a memory item by ID.
555 Args:
556 item_id: The item ID to delete.
558 Returns:
559 True if item was deleted, False if not found.
560 """
561 conn = self._ensure_connection()
562 with self._lock:
563 cursor = conn.execute(
564 "DELETE FROM memory_items WHERE id = ?",
565 (item_id,)
566 )
567 deleted = cursor.rowcount > 0
569 if deleted:
570 try:
571 from core.platform.events import emit_event
572 emit_event('memory.item_deleted', {'id': item_id})
573 except Exception:
574 pass
576 return deleted
578 def delete_by_source(self, source: str) -> int:
579 """
580 Delete all memory items from a specific source.
582 Args:
583 source: The source identifier.
585 Returns:
586 Number of items deleted.
587 """
588 conn = self._ensure_connection()
589 with self._lock:
590 cursor = conn.execute(
591 "DELETE FROM memory_items WHERE source = ?",
592 (source,)
593 )
594 return cursor.rowcount
596 def clear(self) -> int:
597 """
598 Clear all memory items from the store.
600 Returns:
601 Number of items deleted.
602 """
603 conn = self._ensure_connection()
604 with self._lock:
605 cursor = conn.execute("DELETE FROM memory_items")
606 return cursor.rowcount
608 def count(self, source: Optional[str] = None) -> int:
609 """
610 Count memory items in the store.
612 Args:
613 source: Optional source to filter by.
615 Returns:
616 Number of items.
617 """
618 conn = self._ensure_connection()
619 with self._lock:
620 if source:
621 row = conn.execute(
622 "SELECT COUNT(*) as cnt FROM memory_items WHERE source = ?",
623 (source,)
624 ).fetchone()
625 else:
626 row = conn.execute("SELECT COUNT(*) as cnt FROM memory_items").fetchone()
627 return row['cnt'] if row else 0
629 def list_sources(self) -> List[str]:
630 """
631 List all unique sources in the store.
633 Returns:
634 List of source identifiers.
635 """
636 conn = self._ensure_connection()
637 with self._lock:
638 rows = conn.execute(
639 "SELECT DISTINCT source FROM memory_items ORDER BY source"
640 ).fetchall()
641 return [row['source'] for row in rows]
643 def update_embedding(self, item_id: str) -> bool:
644 """
645 Recompute and update the embedding for an item.
647 Args:
648 item_id: The item ID.
650 Returns:
651 True if updated, False if item not found or no embedding function.
652 """
653 if not self.embedding_fn:
654 return False
656 item = self.get(item_id)
657 if not item:
658 return False
660 try:
661 embedding = self.embedding_fn(item.content)
662 except Exception:
663 return False
665 conn = self._ensure_connection()
666 with self._lock:
667 conn.execute(
668 "UPDATE memory_items SET embedding = ?, updated_at = ? WHERE id = ?",
669 (json.dumps(embedding), time.time(), item_id)
670 )
672 return True
674 def close(self):
675 """Close the database connection."""
676 with self._lock:
677 if self._conn:
678 self._conn.close()
679 self._conn = None
681 def _row_to_item(self, row: sqlite3.Row) -> MemoryItem:
682 """Convert a database row to a MemoryItem."""
683 embedding = None
684 if row['embedding']:
685 try:
686 embedding = json.loads(row['embedding'])
687 except (json.JSONDecodeError, TypeError):
688 pass
690 metadata = {}
691 if row['metadata']:
692 try:
693 metadata = json.loads(row['metadata'])
694 except (json.JSONDecodeError, TypeError):
695 pass
697 return MemoryItem(
698 id=row['id'],
699 content=row['content'],
700 metadata=metadata,
701 embedding=embedding,
702 source=row['source'] or 'memory',
703 created_at=row['created_at'] or time.time(),
704 updated_at=row['updated_at'] or time.time(),
705 hash=row['hash'] or '',
706 )
708 def _build_fts_query(self, raw: str) -> Optional[str]:
709 """Build an FTS5 query from raw input."""
710 import re
711 tokens = re.findall(r'[A-Za-z0-9_]+', raw)
712 if not tokens:
713 return None
714 # Quote each token and join with AND
715 quoted = [f'"{t}"' for t in tokens if t]
716 return " AND ".join(quoted)
718 def _bm25_to_score(self, rank: float) -> float:
719 """Convert BM25 rank to a 0-1 score (higher is better)."""
720 # BM25 returns negative scores, more negative = better match
721 normalized = max(0, rank) if rank >= 0 else abs(rank)
722 return 1 / (1 + normalized) if normalized > 0 else 1.0
724 def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
725 """Compute cosine similarity between two vectors."""
726 if not a or not b:
727 return 0.0
729 length = min(len(a), len(b))
730 dot = sum(a[i] * b[i] for i in range(length))
731 norm_a = sum(x * x for x in a[:length]) ** 0.5
732 norm_b = sum(x * x for x in b[:length]) ** 0.5
734 if norm_a == 0 or norm_b == 0:
735 return 0.0
737 return dot / (norm_a * norm_b)
739 def __enter__(self):
740 return self
742 def __exit__(self, exc_type, exc_val, exc_tb):
743 self.close()
744 return False