Coverage for integrations / channels / memory / agent_memory_tools.py: 12.6%
103 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Agent Memory Tools — Framework-agnostic tool functions + framework adapters.
4Core tools (plain Python functions, no framework dependencies):
5- remember(): Register a memory with provenance tracking
6- recall_memory(): Search memories with semantic/text/hybrid
7- backtrace_memory(): Trace memory chains back to origin
8- get_memory_context(): Auto-recall from current context
9- record_lifecycle_event(): Record agent lifecycle transitions
11Framework adapters (thin wrappers):
12- register_autogen_tools(): Registers tools on autogen agents
13- create_langchain_tools(): Creates LangChain StructuredTool instances
14"""
16import json
17import logging
18import re
19import time
20from datetime import datetime
21from typing import Any, Callable, Dict, List, Optional, Tuple
23logger = logging.getLogger(__name__)
25# UUID hex pattern (16 chars) — used to detect direct vs semantic backtrace
26_UUID_PATTERN = re.compile(r'^[a-f0-9]{16}$')
28# Shortcut like "2h" / "30m" / "7d" → seconds offset from now
29_REL_TIME_PATTERN = re.compile(r'^\s*(\d+)\s*([smhd])\s*$', re.IGNORECASE)
32def _parse_time_arg(arg: 'str | None') -> 'float | None':
33 """Accept ISO-8601, bare date, or relative shortcut and return a
34 UNIX epoch timestamp (seconds). Returns None on empty / unparseable
35 input so callers pass it straight through to the store filter."""
36 if arg is None:
37 return None
38 s = str(arg).strip()
39 if not s or s.lower() in ('none', 'null', ''):
40 return None
41 # Relative: "1h", "30m", "7d", "45s" → now - offset
42 m = _REL_TIME_PATTERN.match(s)
43 if m:
44 amount = int(m.group(1))
45 unit = m.group(2).lower()
46 scale = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[unit]
47 return time.time() - amount * scale
48 # ISO-8601 + common variants
49 s2 = s.rstrip('Z').rstrip('z')
50 for fmt in (
51 '%Y-%m-%dT%H:%M:%S.%f',
52 '%Y-%m-%dT%H:%M:%S',
53 '%Y-%m-%d %H:%M:%S',
54 '%Y-%m-%d',
55 ):
56 try:
57 return datetime.strptime(s2, fmt).timestamp()
58 except ValueError:
59 continue
60 return None
63def create_memory_tools(
64 graph: Any, # MemoryGraph — typed as Any to avoid circular import
65 user_id: str,
66 session_id: str,
67) -> Dict[str, Tuple[Callable, str]]:
68 """
69 Create framework-agnostic memory tool functions.
71 Args:
72 graph: MemoryGraph instance.
73 user_id: Current user ID.
74 session_id: Current session scope (e.g. user_id_prompt_id).
76 Returns:
77 Dict of {tool_name: (function, description)}.
78 Any framework can wrap these — they're plain Python functions.
79 """
81 def remember(
82 content: str,
83 memory_type: str = "fact",
84 context: str = "",
85 ) -> str:
86 """Register a memory with provenance tracking. Automatically links to recent memories in the same session."""
87 try:
88 # Auto-find recent memories in this session as parents
89 recent = graph._get_latest_session_memory(session_id)
90 parent_ids = [recent.id] if recent else []
92 memory_id = graph.register(
93 content=content,
94 metadata={
95 "memory_type": memory_type,
96 "source_agent": "agent",
97 "session_id": session_id,
98 },
99 parent_ids=parent_ids,
100 context_snapshot=context or f"Remembered during session {session_id}",
101 )
102 return f"Remembered (id={memory_id}). Use backtrace_memory('{memory_id}') to trace its origin chain."
103 except Exception as e:
104 logger.warning(f"Remember failed: {e}")
105 return f"Failed to remember: {e}"
107 def recall_memory(
108 query: str,
109 mode: str = "hybrid",
110 since: 'str | None' = None,
111 until: 'str | None' = None,
112 ) -> str:
113 """Search all memories using natural language.
115 Returns matching memories with IDs for backtrace. Optional
116 ``since`` / ``until`` restrict the result to a time window —
117 use ISO-8601 ('2026-04-10T15:00') or a bare date ('2026-04-10')
118 or a relative shortcut ('1h', '24h', '7d'). Pass both for a
119 bounded range, either for a one-sided filter.
120 """
121 try:
122 _since_ts = _parse_time_arg(since)
123 _until_ts = _parse_time_arg(until)
124 nodes = graph.recall(
125 query, mode=mode, top_k=5,
126 since=_since_ts, until=_until_ts,
127 )
128 if not nodes:
129 return "No matching memories found."
131 lines = []
132 for i, node in enumerate(nodes, 1):
133 created = ""
134 try:
135 from datetime import datetime
136 created = datetime.fromtimestamp(node.created_at).strftime("%Y-%m-%d %H:%M")
137 except Exception:
138 pass
139 lines.append(
140 f"{i}. [id={node.id}] {node.content[:200]} "
141 f"(type={node.memory_type}, agent={node.source_agent}, created={created})"
142 )
143 return "\n".join(lines)
144 except Exception as e:
145 logger.warning(f"Recall failed: {e}")
146 return f"Memory recall failed: {e}"
148 def backtrace_memory(
149 memory_id_or_query: str,
150 ) -> str:
151 """Trace a memory back to its origin. Pass a memory ID for direct backtrace, or a query for semantic backtrace."""
152 try:
153 if _UUID_PATTERN.match(memory_id_or_query):
154 # Direct backtrace by ID
155 chain = graph.backtrace(memory_id_or_query, depth=10)
156 if not chain:
157 return f"No memory found with id={memory_id_or_query}"
159 lines = ["Memory chain (origin → current):"]
160 for i, node in enumerate(chain):
161 arrow = " " if i == 0 else "→ "
162 lines.append(
163 f" {arrow}[{node.id}] {node.memory_type} by {node.source_agent}: "
164 f"{node.content[:150]}"
165 )
166 return "\n".join(lines)
167 else:
168 # Semantic backtrace by query
169 chains = graph.backtrace_semantic(memory_id_or_query, depth=5, top_k=3)
170 if not chains:
171 return f"No memories found matching '{memory_id_or_query}'"
173 lines = []
174 for ci, chain in enumerate(chains, 1):
175 lines.append(f"\nChain {ci}:")
176 for i, node in enumerate(chain):
177 arrow = " " if i == 0 else "→ "
178 lines.append(
179 f" {arrow}[{node.id}] {node.memory_type} by {node.source_agent}: "
180 f"{node.content[:150]}"
181 )
182 return "\n".join(lines)
183 except Exception as e:
184 logger.warning(f"Backtrace failed: {e}")
185 return f"Memory backtrace failed: {e}"
187 def get_memory_context() -> str:
188 """Get automatically recalled memories relevant to the current conversation context."""
189 try:
190 # Get recent session memories as context source
191 recent_memories = graph.get_session_memories(session_id, limit=5)
192 if not recent_memories:
193 return "No session memories available for context recall."
195 recent_texts = [m.content for m in recent_memories[-3:]]
196 relevant = graph.context_recall(recent_texts, top_k=3)
198 if not relevant:
199 return "No relevant memories found for current context."
201 lines = ["Relevant memories from past sessions:"]
202 for node in relevant:
203 lines.append(
204 f"- [{node.memory_type}] {node.content[:200]} (by {node.source_agent})"
205 )
206 return "\n".join(lines)
207 except Exception as e:
208 logger.warning(f"Context recall failed: {e}")
209 return f"Context recall failed: {e}"
211 def record_lifecycle_event(
212 event: str,
213 details: str = "",
214 ) -> str:
215 """Record an agent lifecycle event (e.g. 'Creation Mode', 'completed', 'Reuse Mode')."""
216 try:
217 memory_id = graph.register_lifecycle(
218 event=event,
219 agent_id=user_id,
220 session_id=session_id,
221 details=details,
222 )
223 return f"Lifecycle event recorded: {event} (id={memory_id})"
224 except Exception as e:
225 logger.warning(f"Lifecycle recording failed: {e}")
226 return f"Failed to record lifecycle event: {e}"
228 return {
229 "remember": (
230 remember,
231 "Save important facts, decisions, or insights to persistent memory with provenance tracking. "
232 "Automatically links to recent memories for backtrace.",
233 ),
234 "recall_memory": (
235 recall_memory,
236 "Search all memories using natural language query. Returns matching memories with IDs "
237 "that can be used with backtrace_memory to trace their origin chain.",
238 ),
239 "backtrace_memory": (
240 backtrace_memory,
241 "Trace a memory back to its origin. Pass a memory ID (from recall_memory) for direct "
242 "backtrace, or a natural language query for semantic backtrace. Shows the chain of "
243 "memories that led to the current one.",
244 ),
245 "get_memory_context": (
246 get_memory_context,
247 "Get relevant memories from past sessions based on the current conversation context. "
248 "Useful for recalling related information without an explicit query.",
249 ),
250 "record_lifecycle_event": (
251 record_lifecycle_event,
252 "Record an agent lifecycle event such as 'Creation Mode', 'Review Mode', 'completed', "
253 "'Evaluation Mode', or 'Reuse Mode'.",
254 ),
255 }
258# =============================================================================
259# Framework Adapters
260# =============================================================================
263def register_autogen_tools(
264 tools_dict: Dict[str, Tuple[Callable, str]],
265 assistant,
266 helper,
267):
268 """
269 Autogen adapter: register memory tools on autogen agents.
271 Uses assistant.register_for_execution() and helper.register_for_llm()
272 following the pattern in reuse_recipe.py:1547.
274 Args:
275 tools_dict: Output of create_memory_tools().
276 assistant: Autogen AssistantAgent (executor).
277 helper: Autogen AssistantAgent (LLM-callable).
278 """
279 for name, (func, desc) in tools_dict.items():
280 helper.register_for_llm(
281 name=name, api_style="function", description=desc
282 )(func)
283 assistant.register_for_execution(name=name)(func)
285 logger.info(f"Registered {len(tools_dict)} memory tools on autogen agents")
288def create_langchain_tools(
289 tools_dict: Dict[str, Tuple[Callable, str]],
290) -> list:
291 """
292 LangChain adapter: wrap memory tools as StructuredTool instances.
294 Args:
295 tools_dict: Output of create_memory_tools().
297 Returns:
298 List of LangChain Tool instances.
299 """
300 try:
301 from langchain.tools import StructuredTool
302 except ImportError:
303 # In bundled mode, LangChain may use langchain_classic which
304 # has StructuredTool under a different path. This is expected
305 # noise in the frozen build — log at DEBUG, not WARNING.
306 logger.debug("LangChain StructuredTool not available — memory tools skipped")
307 return []
309 tools = []
310 for name, (func, desc) in tools_dict.items():
311 tool = StructuredTool.from_function(
312 func=func,
313 name=name,
314 description=desc,
315 )
316 tools.append(tool)
318 logger.info(f"Created {len(tools)} LangChain memory tools")
319 return tools