Coverage for integrations / channels / memory / shared_history.py: 42.6%

61 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Shared conversation history bridge — single buffer for both LangChain and AutoGen. 

3 

4Eliminates redundancy: both frameworks read from and write to the same 

5PersistentChatHistory (buffer.json + SimpleMem). AutoGen's GroupChat starts 

6seeded with recent messages, and its new messages are written back. 

7 

8Usage in reuse_recipe.py / create_recipe.py: 

9 

10 from integrations.channels.memory.shared_history import ( 

11 seed_autogen_from_shared_history, 

12 create_autogen_history_hook, 

13 ) 

14 

15 # Before GroupChat creation: 

16 seed_messages = seed_autogen_from_shared_history(user_id, max_messages=8) 

17 group_chat = autogen.GroupChat(agents=[...], messages=seed_messages, ...) 

18 

19 # After GroupChat creation: 

20 hook = create_autogen_history_hook(user_id) 

21 if hook: 

22 group_chat.messages.append = hook(group_chat.messages.append) 

23""" 

24 

25import logging 

26import os 

27from datetime import datetime 

28from typing import List, Dict, Any, Optional, Callable 

29 

30logger = logging.getLogger(__name__) 

31 

32# Canonical buffer root — same as simplemem_langchain.py 

33SIMPLEMEM_DB_ROOT = os.path.join( 

34 os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'simplemem') 

35 

36 

37def _get_persistent_history(user_id: int): 

38 """Get the PersistentChatHistory instance for a user (same one LangChain uses).""" 

39 try: 

40 from integrations.channels.memory.simplemem_langchain import SimpleMemChatMemory 

41 memory = SimpleMemChatMemory.load_or_create(user_id) 

42 if hasattr(memory, 'chat_memory'): 

43 return memory.chat_memory 

44 except Exception as e: 

45 logger.debug("Could not load PersistentChatHistory for user %s: %s", user_id, e) 

46 return None 

47 

48 

49def seed_autogen_from_shared_history( 

50 user_id: int, 

51 max_messages: int = 8, 

52) -> List[Dict[str, Any]]: 

53 """Load recent messages from the shared buffer as autogen GroupChat seed messages. 

54 

55 Returns a list of autogen-formatted message dicts: 

56 [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...] 

57 

58 Deduplication: messages include a `_ts` key with the original timestamp. 

59 AutoGen won't re-write these to the buffer (the hook checks `_ts`). 

60 """ 

61 history = _get_persistent_history(user_id) 

62 if not history: 

63 return [] 

64 

65 try: 

66 from langchain_core.messages import HumanMessage, AIMessage 

67 raw_msgs = history.messages # thread-safe property copy 

68 timestamps = getattr(history, '_timestamps', []) 

69 

70 # Take the last N messages 

71 recent = raw_msgs[-max_messages:] 

72 recent_ts = timestamps[-max_messages:] if timestamps else [] 

73 

74 seed = [] 

75 for i, msg in enumerate(recent): 

76 if isinstance(msg, HumanMessage): 

77 role = "user" 

78 name = "User" 

79 elif isinstance(msg, AIMessage): 

80 role = "assistant" 

81 name = "assistant" 

82 else: 

83 continue 

84 

85 ts = recent_ts[i] if i < len(recent_ts) else None 

86 seed.append({ 

87 "role": role, 

88 "name": name, 

89 "content": msg.content, 

90 "_ts": ts, # marker to prevent re-write 

91 "_from_shared": True, # marker for dedup 

92 }) 

93 

94 logger.info("Seeded autogen with %d messages from shared history (user %s)", 

95 len(seed), user_id) 

96 return seed 

97 except Exception as e: 

98 logger.warning("Failed to seed autogen from shared history: %s", e) 

99 return [] 

100 

101 

102def create_autogen_history_hook( 

103 user_id: int, 

104 simplemem_store=None, 

105) -> Optional[Callable]: 

106 """Create a hook that writes autogen messages back to the shared buffer. 

107 

108 Returns a wrapper for GroupChat.messages.append that also writes to 

109 PersistentChatHistory — only for NEW messages (skips seeded ones). 

110 

111 Usage: 

112 hook = create_autogen_history_hook(user_id) 

113 if hook: 

114 original_append = group_chat.messages.append 

115 group_chat.messages.append = hook(original_append) 

116 """ 

117 history = _get_persistent_history(user_id) 

118 if not history: 

119 return None 

120 

121 def _make_hook(orig_append): 

122 def hooked_append(msg): 

123 # Call original append first 

124 orig_append(msg) 

125 

126 # Skip seeded messages (already in buffer) 

127 if isinstance(msg, dict) and msg.get('_from_shared'): 

128 return 

129 

130 # Write new autogen messages to the shared buffer 

131 try: 

132 content = msg.get('content', '') if isinstance(msg, dict) else str(msg) 

133 role = msg.get('role', 'assistant') if isinstance(msg, dict) else 'assistant' 

134 

135 if not content or content == 'TERMINATE': 

136 return 

137 

138 from langchain_core.messages import HumanMessage, AIMessage 

139 if role == 'user': 

140 lc_msg = HumanMessage(content=content) 

141 else: 

142 lc_msg = AIMessage(content=content) 

143 

144 # Dedup: check if the exact same content+timestamp already exists 

145 # (within last 5 messages to keep it fast) 

146 existing = history.messages[-5:] if history.messages else [] 

147 for ex in existing: 

148 if ex.content == content: 

149 return # already in buffer — skip 

150 

151 history.add_message(lc_msg, metadata={ 

152 'timestamp': datetime.now().isoformat(), 

153 'source': 'autogen', 

154 }) 

155 except Exception as e: 

156 logger.debug("Autogen→shared history write failed: %s", e) 

157 

158 return hooked_append 

159 

160 return _make_hook