Coverage for integrations / channels / memory / simplemem_store.py: 50.6%

77 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2SimpleMem Store - Lifelong memory integration using SimpleMem. 

3 

4Wraps the SimpleMem library to provide advanced memory capabilities: 

5- Semantic Structured Compression (atomic fact extraction) 

6- Structured Multi-View Indexing (LanceDB vectors) 

7- Complexity-Aware Adaptive Retrieval 

8 

9Implements the MemorySource interface for seamless integration 

10with the existing MemorySearch system. 

11""" 

12 

13from __future__ import annotations 

14 

15import logging 

16import os 

17import uuid 

18from dataclasses import dataclass, field 

19from datetime import datetime 

20from typing import Any, Dict, List, Optional, Tuple 

21 

22from .search import MemorySource, SearchMatch 

23 

24logger = logging.getLogger(__name__) 

25 

26try: 

27 from simplemem import SimpleMemSystem 

28 HAS_SIMPLEMEM = True 

29except ImportError: 

30 HAS_SIMPLEMEM = False 

31 

32 

33@dataclass 

34class SimpleMemConfig: 

35 """Configuration for SimpleMem integration.""" 

36 

37 enabled: bool = True 

38 api_key: str = "" 

39 base_url: Optional[str] = None 

40 model: str = "gpt-4.1-mini" 

41 embedding_model: str = "Qwen/Qwen3-Embedding-0.6B" 

42 db_path: str = "" # Resolved at runtime via platform_paths 

43 window_size: int = 40 

44 overlap_size: int = 2 

45 parallel_workers: int = 8 

46 retrieval_workers: int = 4 

47 auto_finalize_interval: int = 40 # Finalize every N dialogues 

48 

49 @classmethod 

50 def from_env(cls) -> "SimpleMemConfig": 

51 """Create configuration from environment variables.""" 

52 return cls( 

53 enabled=os.getenv("SIMPLEMEM_ENABLED", "true").lower() == "true", 

54 api_key=os.getenv("SIMPLEMEM_API_KEY", os.getenv("OPENAI_API_KEY", "")), 

55 base_url=os.getenv("SIMPLEMEM_BASE_URL") or None, 

56 model=os.getenv("SIMPLEMEM_MODEL", "gpt-4.1-mini"), 

57 embedding_model=os.getenv( 

58 "SIMPLEMEM_EMBEDDING_MODEL", "Qwen/Qwen3-Embedding-0.6B" 

59 ), 

60 db_path=os.getenv("SIMPLEMEM_DB_PATH", ""), 

61 window_size=int(os.getenv("SIMPLEMEM_WINDOW_SIZE", "40")), 

62 overlap_size=int(os.getenv("SIMPLEMEM_OVERLAP_SIZE", "2")), 

63 parallel_workers=int(os.getenv("SIMPLEMEM_PARALLEL_WORKERS", "8")), 

64 retrieval_workers=int(os.getenv("SIMPLEMEM_RETRIEVAL_WORKERS", "4")), 

65 auto_finalize_interval=int( 

66 os.getenv("SIMPLEMEM_AUTO_FINALIZE_INTERVAL", "40") 

67 ), 

68 ) 

69 

70 

71class SimpleMemStore(MemorySource): 

72 """ 

73 SimpleMem wrapper implementing MemorySource interface. 

74 

75 Provides lifelong memory with: 

76 - 43.24% F1 score on memory retrieval 

77 - 30x token efficiency through semantic compression 

78 - Three-stage pipeline: Compression -> Indexing -> Adaptive Retrieval 

79 """ 

80 

81 def __init__(self, config: Optional[SimpleMemConfig] = None): 

82 if not HAS_SIMPLEMEM: 

83 raise ImportError( 

84 "simplemem is required for SimpleMemStore. " 

85 "Install it with: pip install simplemem" 

86 ) 

87 

88 self._config = config or SimpleMemConfig.from_env() 

89 # Resolve db_path to a writable directory (not CWD which may be read-only) 

90 if not self._config.db_path: 

91 try: 

92 from core.platform_paths import get_simplemem_dir 

93 self._config.db_path = get_simplemem_dir() 

94 except ImportError: 

95 self._config.db_path = os.path.join('.', 'simplemem_db') 

96 self._dialogue_count = 0 

97 

98 # Build kwargs for SimpleMemSystem 

99 system_kwargs: Dict[str, Any] = { 

100 "model": self._config.model, 

101 "embedding_model": self._config.embedding_model, 

102 "db_path": self._config.db_path, 

103 } 

104 

105 if self._config.api_key: 

106 system_kwargs["api_key"] = self._config.api_key 

107 if self._config.base_url: 

108 system_kwargs["base_url"] = self._config.base_url 

109 

110 self._system = SimpleMemSystem(**system_kwargs) 

111 

112 @property 

113 def name(self) -> str: 

114 return "simplemem" 

115 

116 async def add( 

117 self, 

118 content: str, 

119 metadata: Optional[Dict[str, Any]] = None, 

120 ) -> str: 

121 """ 

122 Add a dialogue entry to SimpleMem. 

123 

124 Args: 

125 content: The message text. 

126 metadata: Dict with sender_name, timestamp, channel, chat_id, etc. 

127 

128 Returns: 

129 A generated ID for the entry. 

130 """ 

131 metadata = metadata or {} 

132 speaker = metadata.get("sender_name", "User") 

133 timestamp = metadata.get("timestamp", datetime.now().isoformat()) 

134 

135 self._system.add_dialogue(speaker, content, timestamp) 

136 self._dialogue_count += 1 

137 

138 # Auto-finalize when buffer reaches window size 

139 if ( 

140 self._config.auto_finalize_interval > 0 

141 and self._dialogue_count % self._config.auto_finalize_interval == 0 

142 ): 

143 logger.info( 

144 "Auto-finalizing SimpleMem after %d dialogues", 

145 self._dialogue_count, 

146 ) 

147 await self.finalize() 

148 

149 return str(uuid.uuid4()) 

150 

151 async def finalize(self) -> None: 

152 """ 

153 Process buffered dialogues into compressed atomic memories. 

154 

155 This triggers SimpleMem's compression pipeline: 

156 1. Windowed dialogue grouping 

157 2. Atomic fact extraction via LLM 

158 3. Vector indexing into LanceDB 

159 """ 

160 try: 

161 self._system.finalize() 

162 logger.info("SimpleMem finalization complete") 

163 except Exception as e: 

164 logger.error("SimpleMem finalization failed: %s", e) 

165 

166 async def search( 

167 self, 

168 query: str, 

169 max_results: int = 10, 

170 min_score: float = 0.0, 

171 filters: Optional[Dict[str, Any]] = None, 

172 ) -> List[SearchMatch]: 

173 """ 

174 Search using SimpleMem's adaptive retrieval. 

175 

176 SimpleMem uses complexity-aware retrieval that automatically 

177 adjusts search depth based on query complexity. 

178 

179 Args: 

180 query: Search query. 

181 max_results: Maximum results to return. 

182 min_score: Minimum score threshold. 

183 filters: Optional filters (not used by SimpleMem). 

184 

185 Returns: 

186 List of SearchMatch objects. 

187 """ 

188 try: 

189 answer = self._system.ask(query) 

190 

191 if not answer or answer.strip() == "": 

192 return [] 

193 

194 return [ 

195 SearchMatch( 

196 source=self.name, 

197 content=answer, 

198 score=1.0, 

199 match_type="simplemem_adaptive", 

200 snippet=answer[:200] if len(answer) > 200 else answer, 

201 metadata={"query": query, "retrieval_type": "adaptive"}, 

202 timestamp=datetime.now(), 

203 item_id=str(uuid.uuid4()), 

204 ) 

205 ] 

206 except Exception as e: 

207 logger.error("SimpleMem search failed: %s", e) 

208 return [] 

209 

210 async def search_semantic( 

211 self, 

212 query: str, 

213 embedding: List[float], 

214 max_results: int = 10, 

215 min_score: float = 0.0, 

216 ) -> List[SearchMatch]: 

217 """ 

218 Semantic search delegates to SimpleMem's built-in retrieval. 

219 

220 SimpleMem already uses its own embedding model (Qwen3) internally, 

221 so we ignore the provided embedding and use the native pipeline. 

222 

223 Args: 

224 query: Original query text. 

225 embedding: Query embedding (ignored - SimpleMem uses its own). 

226 max_results: Maximum results to return. 

227 min_score: Minimum similarity threshold. 

228 

229 Returns: 

230 List of SearchMatch objects. 

231 """ 

232 return await self.search(query, max_results, min_score) 

233 

234 async def get_context( 

235 self, 

236 item_id: str, 

237 window: int = 5, 

238 ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: 

239 """ 

240 Get context around an item. 

241 

242 SimpleMem's atomic facts don't have sequential context, 

243 so this returns empty lists. 

244 """ 

245 return [], [] 

246 

247 @property 

248 def dialogue_count(self) -> int: 

249 """Get the number of dialogues added since last finalization.""" 

250 return self._dialogue_count 

251 

252 @property 

253 def config(self) -> SimpleMemConfig: 

254 """Get the current configuration.""" 

255 return self._config