Coverage for integrations / channels / memory / simplemem_store.py: 50.6%
77 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2SimpleMem Store - Lifelong memory integration using SimpleMem.
4Wraps the SimpleMem library to provide advanced memory capabilities:
5- Semantic Structured Compression (atomic fact extraction)
6- Structured Multi-View Indexing (LanceDB vectors)
7- Complexity-Aware Adaptive Retrieval
9Implements the MemorySource interface for seamless integration
10with the existing MemorySearch system.
11"""
13from __future__ import annotations
15import logging
16import os
17import uuid
18from dataclasses import dataclass, field
19from datetime import datetime
20from typing import Any, Dict, List, Optional, Tuple
22from .search import MemorySource, SearchMatch
24logger = logging.getLogger(__name__)
26try:
27 from simplemem import SimpleMemSystem
28 HAS_SIMPLEMEM = True
29except ImportError:
30 HAS_SIMPLEMEM = False
33@dataclass
34class SimpleMemConfig:
35 """Configuration for SimpleMem integration."""
37 enabled: bool = True
38 api_key: str = ""
39 base_url: Optional[str] = None
40 model: str = "gpt-4.1-mini"
41 embedding_model: str = "Qwen/Qwen3-Embedding-0.6B"
42 db_path: str = "" # Resolved at runtime via platform_paths
43 window_size: int = 40
44 overlap_size: int = 2
45 parallel_workers: int = 8
46 retrieval_workers: int = 4
47 auto_finalize_interval: int = 40 # Finalize every N dialogues
49 @classmethod
50 def from_env(cls) -> "SimpleMemConfig":
51 """Create configuration from environment variables."""
52 return cls(
53 enabled=os.getenv("SIMPLEMEM_ENABLED", "true").lower() == "true",
54 api_key=os.getenv("SIMPLEMEM_API_KEY", os.getenv("OPENAI_API_KEY", "")),
55 base_url=os.getenv("SIMPLEMEM_BASE_URL") or None,
56 model=os.getenv("SIMPLEMEM_MODEL", "gpt-4.1-mini"),
57 embedding_model=os.getenv(
58 "SIMPLEMEM_EMBEDDING_MODEL", "Qwen/Qwen3-Embedding-0.6B"
59 ),
60 db_path=os.getenv("SIMPLEMEM_DB_PATH", ""),
61 window_size=int(os.getenv("SIMPLEMEM_WINDOW_SIZE", "40")),
62 overlap_size=int(os.getenv("SIMPLEMEM_OVERLAP_SIZE", "2")),
63 parallel_workers=int(os.getenv("SIMPLEMEM_PARALLEL_WORKERS", "8")),
64 retrieval_workers=int(os.getenv("SIMPLEMEM_RETRIEVAL_WORKERS", "4")),
65 auto_finalize_interval=int(
66 os.getenv("SIMPLEMEM_AUTO_FINALIZE_INTERVAL", "40")
67 ),
68 )
71class SimpleMemStore(MemorySource):
72 """
73 SimpleMem wrapper implementing MemorySource interface.
75 Provides lifelong memory with:
76 - 43.24% F1 score on memory retrieval
77 - 30x token efficiency through semantic compression
78 - Three-stage pipeline: Compression -> Indexing -> Adaptive Retrieval
79 """
81 def __init__(self, config: Optional[SimpleMemConfig] = None):
82 if not HAS_SIMPLEMEM:
83 raise ImportError(
84 "simplemem is required for SimpleMemStore. "
85 "Install it with: pip install simplemem"
86 )
88 self._config = config or SimpleMemConfig.from_env()
89 # Resolve db_path to a writable directory (not CWD which may be read-only)
90 if not self._config.db_path:
91 try:
92 from core.platform_paths import get_simplemem_dir
93 self._config.db_path = get_simplemem_dir()
94 except ImportError:
95 self._config.db_path = os.path.join('.', 'simplemem_db')
96 self._dialogue_count = 0
98 # Build kwargs for SimpleMemSystem
99 system_kwargs: Dict[str, Any] = {
100 "model": self._config.model,
101 "embedding_model": self._config.embedding_model,
102 "db_path": self._config.db_path,
103 }
105 if self._config.api_key:
106 system_kwargs["api_key"] = self._config.api_key
107 if self._config.base_url:
108 system_kwargs["base_url"] = self._config.base_url
110 self._system = SimpleMemSystem(**system_kwargs)
112 @property
113 def name(self) -> str:
114 return "simplemem"
116 async def add(
117 self,
118 content: str,
119 metadata: Optional[Dict[str, Any]] = None,
120 ) -> str:
121 """
122 Add a dialogue entry to SimpleMem.
124 Args:
125 content: The message text.
126 metadata: Dict with sender_name, timestamp, channel, chat_id, etc.
128 Returns:
129 A generated ID for the entry.
130 """
131 metadata = metadata or {}
132 speaker = metadata.get("sender_name", "User")
133 timestamp = metadata.get("timestamp", datetime.now().isoformat())
135 self._system.add_dialogue(speaker, content, timestamp)
136 self._dialogue_count += 1
138 # Auto-finalize when buffer reaches window size
139 if (
140 self._config.auto_finalize_interval > 0
141 and self._dialogue_count % self._config.auto_finalize_interval == 0
142 ):
143 logger.info(
144 "Auto-finalizing SimpleMem after %d dialogues",
145 self._dialogue_count,
146 )
147 await self.finalize()
149 return str(uuid.uuid4())
151 async def finalize(self) -> None:
152 """
153 Process buffered dialogues into compressed atomic memories.
155 This triggers SimpleMem's compression pipeline:
156 1. Windowed dialogue grouping
157 2. Atomic fact extraction via LLM
158 3. Vector indexing into LanceDB
159 """
160 try:
161 self._system.finalize()
162 logger.info("SimpleMem finalization complete")
163 except Exception as e:
164 logger.error("SimpleMem finalization failed: %s", e)
166 async def search(
167 self,
168 query: str,
169 max_results: int = 10,
170 min_score: float = 0.0,
171 filters: Optional[Dict[str, Any]] = None,
172 ) -> List[SearchMatch]:
173 """
174 Search using SimpleMem's adaptive retrieval.
176 SimpleMem uses complexity-aware retrieval that automatically
177 adjusts search depth based on query complexity.
179 Args:
180 query: Search query.
181 max_results: Maximum results to return.
182 min_score: Minimum score threshold.
183 filters: Optional filters (not used by SimpleMem).
185 Returns:
186 List of SearchMatch objects.
187 """
188 try:
189 answer = self._system.ask(query)
191 if not answer or answer.strip() == "":
192 return []
194 return [
195 SearchMatch(
196 source=self.name,
197 content=answer,
198 score=1.0,
199 match_type="simplemem_adaptive",
200 snippet=answer[:200] if len(answer) > 200 else answer,
201 metadata={"query": query, "retrieval_type": "adaptive"},
202 timestamp=datetime.now(),
203 item_id=str(uuid.uuid4()),
204 )
205 ]
206 except Exception as e:
207 logger.error("SimpleMem search failed: %s", e)
208 return []
210 async def search_semantic(
211 self,
212 query: str,
213 embedding: List[float],
214 max_results: int = 10,
215 min_score: float = 0.0,
216 ) -> List[SearchMatch]:
217 """
218 Semantic search delegates to SimpleMem's built-in retrieval.
220 SimpleMem already uses its own embedding model (Qwen3) internally,
221 so we ignore the provided embedding and use the native pipeline.
223 Args:
224 query: Original query text.
225 embedding: Query embedding (ignored - SimpleMem uses its own).
226 max_results: Maximum results to return.
227 min_score: Minimum similarity threshold.
229 Returns:
230 List of SearchMatch objects.
231 """
232 return await self.search(query, max_results, min_score)
234 async def get_context(
235 self,
236 item_id: str,
237 window: int = 5,
238 ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
239 """
240 Get context around an item.
242 SimpleMem's atomic facts don't have sequential context,
243 so this returns empty lists.
244 """
245 return [], []
247 @property
248 def dialogue_count(self) -> int:
249 """Get the number of dialogues added since last finalization."""
250 return self._dialogue_count
252 @property
253 def config(self) -> SimpleMemConfig:
254 """Get the current configuration."""
255 return self._config