Coverage for integrations / social / federation.py: 52.5%

118 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Mastodon-Style Federation 

3Instances follow each other and share content across the federated network. 

4Built on top of gossip peer discovery - gossip finds peers, federation shares content. 

5 

6Concepts: 

7- Instance follow: Node A follows Node B → B pushes new posts to A's inbox 

8- Outbox: When a local post is created, push to all followers' inboxes 

9- Inbox: Receive posts from followed instances, store as federated posts 

10- Boost: Re-share a federated post to local feed 

11""" 

12import logging 

13import threading 

14import requests 

15from datetime import datetime 

16 

17from core.http_pool import pooled_get, pooled_post 

18from typing import Optional, List 

19from core.port_registry import get_port 

20 

21logger = logging.getLogger('hevolve_social') 

22 

23 

24class FederationManager: 

25 """Manages instance-level follows and content federation between HevolveBot nodes.""" 

26 

27 def __init__(self): 

28 self._lock = threading.Lock() 

29 

30 # ─── Instance Follow/Unfollow ─── 

31 

32 def follow_instance(self, db, local_node_id: str, peer_node_id: str, 

33 peer_url: str) -> bool: 

34 """ 

35 Follow a remote instance. Sends follow request to the peer. 

36 Returns True if follow was created. 

37 """ 

38 from .models import InstanceFollow, PeerNode 

39 existing = db.query(InstanceFollow).filter( 

40 InstanceFollow.follower_node_id == local_node_id, 

41 InstanceFollow.following_node_id == peer_node_id, 

42 ).first() 

43 if existing: 

44 return False 

45 

46 follow = InstanceFollow( 

47 follower_node_id=local_node_id, 

48 following_node_id=peer_node_id, 

49 peer_url=peer_url, 

50 status='active', 

51 ) 

52 db.add(follow) 

53 db.flush() 

54 

55 # Notify the remote instance 

56 threading.Thread( 

57 target=self._send_follow_notification, 

58 args=(peer_url, local_node_id, self._get_local_url()), 

59 daemon=True, 

60 ).start() 

61 

62 return True 

63 

64 def unfollow_instance(self, db, local_node_id: str, peer_node_id: str): 

65 """Unfollow a remote instance.""" 

66 from .models import InstanceFollow 

67 follow = db.query(InstanceFollow).filter( 

68 InstanceFollow.follower_node_id == local_node_id, 

69 InstanceFollow.following_node_id == peer_node_id, 

70 ).first() 

71 if follow: 

72 db.delete(follow) 

73 db.flush() 

74 

75 def get_followers(self, db, node_id: str) -> list: 

76 """Get list of instances following this node.""" 

77 from .models import InstanceFollow 

78 follows = db.query(InstanceFollow).filter( 

79 InstanceFollow.following_node_id == node_id, 

80 InstanceFollow.status == 'active', 

81 ).all() 

82 return [f.to_dict() for f in follows] 

83 

84 def get_following(self, db, node_id: str) -> list: 

85 """Get list of instances this node follows.""" 

86 from .models import InstanceFollow 

87 follows = db.query(InstanceFollow).filter( 

88 InstanceFollow.follower_node_id == node_id, 

89 InstanceFollow.status == 'active', 

90 ).all() 

91 return [f.to_dict() for f in follows] 

92 

93 # ─── Outbox: Push local posts to followers ─── 

94 

95 def push_to_followers(self, db, post_dict: dict): 

96 """ 

97 Push a new local post to all instances that follow us. 

98 Called when a post is created locally. 

99 """ 

100 from .peer_discovery import gossip 

101 followers = self.get_followers(db, gossip.node_id) 

102 if not followers: 

103 return 

104 

105 payload = { 

106 'type': 'new_post', 

107 'origin_node_id': gossip.node_id, 

108 'origin_url': gossip.base_url, 

109 'origin_name': gossip.node_name, 

110 'post': post_dict, 

111 'timestamp': datetime.utcnow().isoformat(), 

112 } 

113 

114 for follower in followers: 

115 threading.Thread( 

116 target=self._deliver_to_inbox, 

117 args=(follower['peer_url'], payload, 

118 follower.get('follower_node_id', '')), 

119 daemon=True, 

120 ).start() 

121 

122 def _deliver_to_inbox(self, peer_url: str, payload: dict, 

123 follower_node_id: str = ''): 

124 """Deliver to peer's inbox — PeerLink first, HTTP fallback.""" 

125 # Try PeerLink direct delivery (avoids HTTP round-trip) 

126 if follower_node_id: 

127 try: 

128 from core.peer_link.link_manager import get_link_manager 

129 link = get_link_manager().get_link(follower_node_id) 

130 if link: 

131 link.send('federation', payload) 

132 logger.debug(f"Federation: delivered via PeerLink to {follower_node_id[:8]}") 

133 return 

134 except Exception: 

135 pass 

136 

137 # HTTP fallback 

138 try: 

139 resp = pooled_post( 

140 f"{peer_url}/api/social/federation/inbox", 

141 json=payload, 

142 timeout=10, 

143 ) 

144 if resp.status_code == 200: 

145 logger.debug(f"Federation: delivered to {peer_url}") 

146 else: 

147 logger.debug(f"Federation: delivery failed to {peer_url}: {resp.status_code}") 

148 except requests.RequestException as e: 

149 logger.debug(f"Federation: delivery error to {peer_url}: {e}") 

150 

151 # ─── Inbox: Receive posts from followed instances ─── 

152 

153 def receive_inbox(self, db, payload: dict) -> Optional[str]: 

154 """ 

155 Process an incoming federated post. 

156 Deduplicates by origin_node_id + post.id. 

157 Verifies sender's guardrail hash before accepting - continuous audit 

158 applies to every interaction, not just periodic checks. 

159 Returns the FederatedPost id if created, None if duplicate. 

160 """ 

161 from .models import FederatedPost, PeerNode 

162 from .peer_discovery import gossip 

163 

164 msg_type = payload.get('type') 

165 if msg_type != 'new_post': 

166 return None 

167 

168 post_data = payload.get('post', {}) 

169 origin_node = payload.get('origin_node_id', '') 

170 origin_post_id = post_data.get('id', '') 

171 

172 # Continuous audit: verify sender is still a valid peer with matching values 

173 if origin_node: 

174 peer = db.query(PeerNode).filter_by(node_id=origin_node).first() 

175 if peer and peer.integrity_status == 'banned': 

176 logger.debug(f"Federation inbox: rejecting post from banned node {origin_node[:8]}") 

177 return None 

178 

179 if not origin_node or not origin_post_id: 

180 return None 

181 

182 # Dedup 

183 existing = db.query(FederatedPost).filter( 

184 FederatedPost.origin_node_id == origin_node, 

185 FederatedPost.origin_post_id == origin_post_id, 

186 ).first() 

187 if existing: 

188 return None 

189 

190 federated = FederatedPost( 

191 origin_node_id=origin_node, 

192 origin_node_url=payload.get('origin_url', ''), 

193 origin_node_name=payload.get('origin_name', ''), 

194 origin_post_id=origin_post_id, 

195 origin_author=post_data.get('author', {}).get('username', ''), 

196 title=post_data.get('title', ''), 

197 content=post_data.get('content', ''), 

198 content_type=post_data.get('content_type', 'text'), 

199 media_urls=post_data.get('media_urls', []), 

200 score=post_data.get('score', 0), 

201 comment_count=post_data.get('comment_count', 0), 

202 original_created_at=post_data.get('created_at'), 

203 ) 

204 db.add(federated) 

205 db.flush() 

206 

207 logger.info(f"Federation: received post '{federated.title[:50]}' " 

208 f"from {origin_node[:8]}") 

209 return federated.id 

210 

211 # ─── Federated Feed ─── 

212 

213 def get_federated_feed(self, db, limit: int = 20, offset: int = 0) -> tuple: 

214 """Get posts from all followed instances, merged into a feed.""" 

215 from .models import FederatedPost 

216 q = db.query(FederatedPost).order_by(FederatedPost.received_at.desc()) 

217 total = q.count() 

218 posts = q.offset(offset).limit(limit).all() 

219 return [p.to_dict() for p in posts], total 

220 

221 # ─── Pull: Fetch recent posts from a peer (on-demand) ─── 

222 

223 def pull_from_peer(self, db, peer_url: str, limit: int = 20) -> int: 

224 """Pull recent posts from a peer's outbox. Returns count of new posts.""" 

225 try: 

226 resp = pooled_get( 

227 f"{peer_url}/api/social/federation/outbox", 

228 params={'limit': limit}, 

229 timeout=10, 

230 ) 

231 if resp.status_code != 200: 

232 return 0 

233 data = resp.json() 

234 posts = data.get('posts', []) 

235 origin_node = data.get('node_id', '') 

236 origin_url = data.get('url', peer_url) 

237 origin_name = data.get('name', '') 

238 

239 count = 0 

240 for post in posts: 

241 payload = { 

242 'type': 'new_post', 

243 'origin_node_id': origin_node, 

244 'origin_url': origin_url, 

245 'origin_name': origin_name, 

246 'post': post, 

247 } 

248 result = self.receive_inbox(db, payload) 

249 if result: 

250 count += 1 

251 return count 

252 except requests.RequestException as e: 

253 logger.debug(f"Federation pull failed from {peer_url}: {e}") 

254 return 0 

255 

256 # ─── Helpers ─── 

257 

258 def _send_follow_notification(self, peer_url: str, follower_node_id: str, 

259 follower_url: str): 

260 """Notify a peer that we are now following them.""" 

261 try: 

262 pooled_post( 

263 f"{peer_url}/api/social/federation/follow-notification", 

264 json={ 

265 'follower_node_id': follower_node_id, 

266 'follower_url': follower_url, 

267 }, 

268 timeout=5, 

269 ) 

270 except requests.RequestException: 

271 pass 

272 

273 def _get_local_url(self): 

274 try: 

275 from .peer_discovery import gossip 

276 return gossip.base_url 

277 except Exception: 

278 import os 

279 return os.environ.get('HEVOLVE_BASE_URL', f'http://localhost:{get_port("backend")}') 

280 

281 

282# Module-level singleton 

283federation = FederationManager()