Coverage for integrations / channels / memory / shared_history.py: 42.6%
61 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Shared conversation history bridge — single buffer for both LangChain and AutoGen.
4Eliminates redundancy: both frameworks read from and write to the same
5PersistentChatHistory (buffer.json + SimpleMem). AutoGen's GroupChat starts
6seeded with recent messages, and its new messages are written back.
8Usage in reuse_recipe.py / create_recipe.py:
10 from integrations.channels.memory.shared_history import (
11 seed_autogen_from_shared_history,
12 create_autogen_history_hook,
13 )
15 # Before GroupChat creation:
16 seed_messages = seed_autogen_from_shared_history(user_id, max_messages=8)
17 group_chat = autogen.GroupChat(agents=[...], messages=seed_messages, ...)
19 # After GroupChat creation:
20 hook = create_autogen_history_hook(user_id)
21 if hook:
22 group_chat.messages.append = hook(group_chat.messages.append)
23"""
25import logging
26import os
27from datetime import datetime
28from typing import List, Dict, Any, Optional, Callable
30logger = logging.getLogger(__name__)
32# Canonical buffer root — same as simplemem_langchain.py
33SIMPLEMEM_DB_ROOT = os.path.join(
34 os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'simplemem')
37def _get_persistent_history(user_id: int):
38 """Get the PersistentChatHistory instance for a user (same one LangChain uses)."""
39 try:
40 from integrations.channels.memory.simplemem_langchain import SimpleMemChatMemory
41 memory = SimpleMemChatMemory.load_or_create(user_id)
42 if hasattr(memory, 'chat_memory'):
43 return memory.chat_memory
44 except Exception as e:
45 logger.debug("Could not load PersistentChatHistory for user %s: %s", user_id, e)
46 return None
49def seed_autogen_from_shared_history(
50 user_id: int,
51 max_messages: int = 8,
52) -> List[Dict[str, Any]]:
53 """Load recent messages from the shared buffer as autogen GroupChat seed messages.
55 Returns a list of autogen-formatted message dicts:
56 [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...]
58 Deduplication: messages include a `_ts` key with the original timestamp.
59 AutoGen won't re-write these to the buffer (the hook checks `_ts`).
60 """
61 history = _get_persistent_history(user_id)
62 if not history:
63 return []
65 try:
66 from langchain_core.messages import HumanMessage, AIMessage
67 raw_msgs = history.messages # thread-safe property copy
68 timestamps = getattr(history, '_timestamps', [])
70 # Take the last N messages
71 recent = raw_msgs[-max_messages:]
72 recent_ts = timestamps[-max_messages:] if timestamps else []
74 seed = []
75 for i, msg in enumerate(recent):
76 if isinstance(msg, HumanMessage):
77 role = "user"
78 name = "User"
79 elif isinstance(msg, AIMessage):
80 role = "assistant"
81 name = "assistant"
82 else:
83 continue
85 ts = recent_ts[i] if i < len(recent_ts) else None
86 seed.append({
87 "role": role,
88 "name": name,
89 "content": msg.content,
90 "_ts": ts, # marker to prevent re-write
91 "_from_shared": True, # marker for dedup
92 })
94 logger.info("Seeded autogen with %d messages from shared history (user %s)",
95 len(seed), user_id)
96 return seed
97 except Exception as e:
98 logger.warning("Failed to seed autogen from shared history: %s", e)
99 return []
102def create_autogen_history_hook(
103 user_id: int,
104 simplemem_store=None,
105) -> Optional[Callable]:
106 """Create a hook that writes autogen messages back to the shared buffer.
108 Returns a wrapper for GroupChat.messages.append that also writes to
109 PersistentChatHistory — only for NEW messages (skips seeded ones).
111 Usage:
112 hook = create_autogen_history_hook(user_id)
113 if hook:
114 original_append = group_chat.messages.append
115 group_chat.messages.append = hook(original_append)
116 """
117 history = _get_persistent_history(user_id)
118 if not history:
119 return None
121 def _make_hook(orig_append):
122 def hooked_append(msg):
123 # Call original append first
124 orig_append(msg)
126 # Skip seeded messages (already in buffer)
127 if isinstance(msg, dict) and msg.get('_from_shared'):
128 return
130 # Write new autogen messages to the shared buffer
131 try:
132 content = msg.get('content', '') if isinstance(msg, dict) else str(msg)
133 role = msg.get('role', 'assistant') if isinstance(msg, dict) else 'assistant'
135 if not content or content == 'TERMINATE':
136 return
138 from langchain_core.messages import HumanMessage, AIMessage
139 if role == 'user':
140 lc_msg = HumanMessage(content=content)
141 else:
142 lc_msg = AIMessage(content=content)
144 # Dedup: check if the exact same content+timestamp already exists
145 # (within last 5 messages to keep it fast)
146 existing = history.messages[-5:] if history.messages else []
147 for ex in existing:
148 if ex.content == content:
149 return # already in buffer — skip
151 history.add_message(lc_msg, metadata={
152 'timestamp': datetime.now().isoformat(),
153 'source': 'autogen',
154 })
155 except Exception as e:
156 logger.debug("Autogen→shared history write failed: %s", e)
158 return hooked_append
160 return _make_hook