Coverage for integrations / social / federation.py: 52.5%
118 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Mastodon-Style Federation
3Instances follow each other and share content across the federated network.
4Built on top of gossip peer discovery - gossip finds peers, federation shares content.
6Concepts:
7- Instance follow: Node A follows Node B → B pushes new posts to A's inbox
8- Outbox: When a local post is created, push to all followers' inboxes
9- Inbox: Receive posts from followed instances, store as federated posts
10- Boost: Re-share a federated post to local feed
11"""
12import logging
13import threading
14import requests
15from datetime import datetime
17from core.http_pool import pooled_get, pooled_post
18from typing import Optional, List
19from core.port_registry import get_port
21logger = logging.getLogger('hevolve_social')
24class FederationManager:
25 """Manages instance-level follows and content federation between HevolveBot nodes."""
27 def __init__(self):
28 self._lock = threading.Lock()
30 # ─── Instance Follow/Unfollow ───
32 def follow_instance(self, db, local_node_id: str, peer_node_id: str,
33 peer_url: str) -> bool:
34 """
35 Follow a remote instance. Sends follow request to the peer.
36 Returns True if follow was created.
37 """
38 from .models import InstanceFollow, PeerNode
39 existing = db.query(InstanceFollow).filter(
40 InstanceFollow.follower_node_id == local_node_id,
41 InstanceFollow.following_node_id == peer_node_id,
42 ).first()
43 if existing:
44 return False
46 follow = InstanceFollow(
47 follower_node_id=local_node_id,
48 following_node_id=peer_node_id,
49 peer_url=peer_url,
50 status='active',
51 )
52 db.add(follow)
53 db.flush()
55 # Notify the remote instance
56 threading.Thread(
57 target=self._send_follow_notification,
58 args=(peer_url, local_node_id, self._get_local_url()),
59 daemon=True,
60 ).start()
62 return True
64 def unfollow_instance(self, db, local_node_id: str, peer_node_id: str):
65 """Unfollow a remote instance."""
66 from .models import InstanceFollow
67 follow = db.query(InstanceFollow).filter(
68 InstanceFollow.follower_node_id == local_node_id,
69 InstanceFollow.following_node_id == peer_node_id,
70 ).first()
71 if follow:
72 db.delete(follow)
73 db.flush()
75 def get_followers(self, db, node_id: str) -> list:
76 """Get list of instances following this node."""
77 from .models import InstanceFollow
78 follows = db.query(InstanceFollow).filter(
79 InstanceFollow.following_node_id == node_id,
80 InstanceFollow.status == 'active',
81 ).all()
82 return [f.to_dict() for f in follows]
84 def get_following(self, db, node_id: str) -> list:
85 """Get list of instances this node follows."""
86 from .models import InstanceFollow
87 follows = db.query(InstanceFollow).filter(
88 InstanceFollow.follower_node_id == node_id,
89 InstanceFollow.status == 'active',
90 ).all()
91 return [f.to_dict() for f in follows]
93 # ─── Outbox: Push local posts to followers ───
95 def push_to_followers(self, db, post_dict: dict):
96 """
97 Push a new local post to all instances that follow us.
98 Called when a post is created locally.
99 """
100 from .peer_discovery import gossip
101 followers = self.get_followers(db, gossip.node_id)
102 if not followers:
103 return
105 payload = {
106 'type': 'new_post',
107 'origin_node_id': gossip.node_id,
108 'origin_url': gossip.base_url,
109 'origin_name': gossip.node_name,
110 'post': post_dict,
111 'timestamp': datetime.utcnow().isoformat(),
112 }
114 for follower in followers:
115 threading.Thread(
116 target=self._deliver_to_inbox,
117 args=(follower['peer_url'], payload,
118 follower.get('follower_node_id', '')),
119 daemon=True,
120 ).start()
122 def _deliver_to_inbox(self, peer_url: str, payload: dict,
123 follower_node_id: str = ''):
124 """Deliver to peer's inbox — PeerLink first, HTTP fallback."""
125 # Try PeerLink direct delivery (avoids HTTP round-trip)
126 if follower_node_id:
127 try:
128 from core.peer_link.link_manager import get_link_manager
129 link = get_link_manager().get_link(follower_node_id)
130 if link:
131 link.send('federation', payload)
132 logger.debug(f"Federation: delivered via PeerLink to {follower_node_id[:8]}")
133 return
134 except Exception:
135 pass
137 # HTTP fallback
138 try:
139 resp = pooled_post(
140 f"{peer_url}/api/social/federation/inbox",
141 json=payload,
142 timeout=10,
143 )
144 if resp.status_code == 200:
145 logger.debug(f"Federation: delivered to {peer_url}")
146 else:
147 logger.debug(f"Federation: delivery failed to {peer_url}: {resp.status_code}")
148 except requests.RequestException as e:
149 logger.debug(f"Federation: delivery error to {peer_url}: {e}")
151 # ─── Inbox: Receive posts from followed instances ───
153 def receive_inbox(self, db, payload: dict) -> Optional[str]:
154 """
155 Process an incoming federated post.
156 Deduplicates by origin_node_id + post.id.
157 Verifies sender's guardrail hash before accepting - continuous audit
158 applies to every interaction, not just periodic checks.
159 Returns the FederatedPost id if created, None if duplicate.
160 """
161 from .models import FederatedPost, PeerNode
162 from .peer_discovery import gossip
164 msg_type = payload.get('type')
165 if msg_type != 'new_post':
166 return None
168 post_data = payload.get('post', {})
169 origin_node = payload.get('origin_node_id', '')
170 origin_post_id = post_data.get('id', '')
172 # Continuous audit: verify sender is still a valid peer with matching values
173 if origin_node:
174 peer = db.query(PeerNode).filter_by(node_id=origin_node).first()
175 if peer and peer.integrity_status == 'banned':
176 logger.debug(f"Federation inbox: rejecting post from banned node {origin_node[:8]}")
177 return None
179 if not origin_node or not origin_post_id:
180 return None
182 # Dedup
183 existing = db.query(FederatedPost).filter(
184 FederatedPost.origin_node_id == origin_node,
185 FederatedPost.origin_post_id == origin_post_id,
186 ).first()
187 if existing:
188 return None
190 federated = FederatedPost(
191 origin_node_id=origin_node,
192 origin_node_url=payload.get('origin_url', ''),
193 origin_node_name=payload.get('origin_name', ''),
194 origin_post_id=origin_post_id,
195 origin_author=post_data.get('author', {}).get('username', ''),
196 title=post_data.get('title', ''),
197 content=post_data.get('content', ''),
198 content_type=post_data.get('content_type', 'text'),
199 media_urls=post_data.get('media_urls', []),
200 score=post_data.get('score', 0),
201 comment_count=post_data.get('comment_count', 0),
202 original_created_at=post_data.get('created_at'),
203 )
204 db.add(federated)
205 db.flush()
207 logger.info(f"Federation: received post '{federated.title[:50]}' "
208 f"from {origin_node[:8]}")
209 return federated.id
211 # ─── Federated Feed ───
213 def get_federated_feed(self, db, limit: int = 20, offset: int = 0) -> tuple:
214 """Get posts from all followed instances, merged into a feed."""
215 from .models import FederatedPost
216 q = db.query(FederatedPost).order_by(FederatedPost.received_at.desc())
217 total = q.count()
218 posts = q.offset(offset).limit(limit).all()
219 return [p.to_dict() for p in posts], total
221 # ─── Pull: Fetch recent posts from a peer (on-demand) ───
223 def pull_from_peer(self, db, peer_url: str, limit: int = 20) -> int:
224 """Pull recent posts from a peer's outbox. Returns count of new posts."""
225 try:
226 resp = pooled_get(
227 f"{peer_url}/api/social/federation/outbox",
228 params={'limit': limit},
229 timeout=10,
230 )
231 if resp.status_code != 200:
232 return 0
233 data = resp.json()
234 posts = data.get('posts', [])
235 origin_node = data.get('node_id', '')
236 origin_url = data.get('url', peer_url)
237 origin_name = data.get('name', '')
239 count = 0
240 for post in posts:
241 payload = {
242 'type': 'new_post',
243 'origin_node_id': origin_node,
244 'origin_url': origin_url,
245 'origin_name': origin_name,
246 'post': post,
247 }
248 result = self.receive_inbox(db, payload)
249 if result:
250 count += 1
251 return count
252 except requests.RequestException as e:
253 logger.debug(f"Federation pull failed from {peer_url}: {e}")
254 return 0
256 # ─── Helpers ───
258 def _send_follow_notification(self, peer_url: str, follower_node_id: str,
259 follower_url: str):
260 """Notify a peer that we are now following them."""
261 try:
262 pooled_post(
263 f"{peer_url}/api/social/federation/follow-notification",
264 json={
265 'follower_node_id': follower_node_id,
266 'follower_url': follower_url,
267 },
268 timeout=5,
269 )
270 except requests.RequestException:
271 pass
273 def _get_local_url(self):
274 try:
275 from .peer_discovery import gossip
276 return gossip.base_url
277 except Exception:
278 import os
279 return os.environ.get('HEVOLVE_BASE_URL', f'http://localhost:{get_port("backend")}')
282# Module-level singleton
283federation = FederationManager()