Coverage for integrations / social / feed_engine.py: 94.9%

59 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Feed Engine 

3Personalized feed: followed users + joined communities + time decay + score. 

4""" 

5import math 

6from datetime import datetime, timedelta 

7from typing import List, Tuple 

8 

9from sqlalchemy import desc, or_ 

10from sqlalchemy.orm import Session, joinedload 

11 

12from .models import Post, Follow, CommunityMembership, Community, User 

13 

14 

15def _hot_score(upvotes: int, downvotes: int, created_at: datetime) -> float: 

16 """Reddit-style hot score: log(score) + age_bias.""" 

17 score = max(upvotes - downvotes, 1) 

18 age_hours = max((datetime.utcnow() - created_at).total_seconds() / 3600, 0.1) 

19 return math.log10(score) - (age_hours / 12) 

20 

21 

22def _base_post_filter(db, q, user_id=None, viewer_user=None, 

23 apply_privacy: bool = False): 

24 """Apply is_deleted + is_hidden + community privacy filters to a Post query. 

25 

26 Phase 7c.5 — when `apply_privacy` is True, additionally AND in the 

27 per-post privacy gate from integrations.social.privacy. Off by 

28 default so flag-off deploys never load the privacy module or run 

29 the EXISTS subqueries. All four feed endpoints 

30 (get_personalized_feed, get_global_feed, get_trending_feed, 

31 get_agent_feed) thread this through. 

32 """ 

33 q = q.filter(Post.is_deleted == False, Post.is_hidden == False) 

34 # Subquery: IDs of public communities 

35 public_cids = db.query(Community.id).filter(Community.is_private == False).subquery() 

36 # Exclude posts from private communities the user has not joined 

37 if user_id: 

38 member_cids = db.query(CommunityMembership.community_id).filter( 

39 CommunityMembership.user_id == user_id 

40 ).subquery() 

41 q = q.filter( 

42 or_( 

43 Post.community_id.is_(None), 

44 Post.community_id.in_(public_cids), 

45 Post.community_id.in_(member_cids), 

46 ) 

47 ) 

48 else: 

49 # Anonymous: exclude all private community posts 

50 q = q.filter( 

51 or_( 

52 Post.community_id.is_(None), 

53 Post.community_id.in_(public_cids), 

54 ) 

55 ) 

56 if apply_privacy: 

57 from .privacy import visible_posts_filter 

58 q = q.filter(visible_posts_filter(viewer_user)) 

59 return q 

60 

61 

62def get_personalized_feed(db: Session, user_id: str, limit: int = 25, 

63 offset: int = 0, viewer_user=None, 

64 apply_privacy: bool = False) -> Tuple[List[Post], int]: 

65 """Feed from followed users + subscribed communities, sorted by hot score.""" 

66 # Get followed user IDs 

67 followed_ids = [f.following_id for f in 

68 db.query(Follow.following_id).filter(Follow.follower_id == user_id).all()] 

69 

70 # Get subscribed community IDs 

71 community_ids = [m.community_id for m in 

72 db.query(CommunityMembership.community_id).filter( 

73 CommunityMembership.user_id == user_id).all()] 

74 

75 if not followed_ids and not community_ids: 

76 # Fallback to global trending 

77 return get_trending_feed(db, limit, offset, user_id=user_id, 

78 viewer_user=viewer_user, 

79 apply_privacy=apply_privacy) 

80 

81 q = db.query(Post).options(joinedload(Post.author)).filter( 

82 or_( 

83 Post.author_id.in_(followed_ids) if followed_ids else False, 

84 Post.community_id.in_(community_ids) if community_ids else False, 

85 ) 

86 ) 

87 q = _base_post_filter(db, q, user_id=user_id, 

88 viewer_user=viewer_user, 

89 apply_privacy=apply_privacy) 

90 total = q.count() 

91 posts = q.order_by(desc(Post.created_at)).offset(offset).limit(limit).all() 

92 return posts, total 

93 

94 

95def get_global_feed(db: Session, sort: str = 'new', limit: int = 25, 

96 offset: int = 0, user_id: str = None, 

97 viewer_user=None, 

98 apply_privacy: bool = False) -> Tuple[List[Post], int]: 

99 """All posts, sorted by chosen method.""" 

100 q = db.query(Post).options(joinedload(Post.author)) 

101 q = _base_post_filter(db, q, user_id=user_id, 

102 viewer_user=viewer_user, 

103 apply_privacy=apply_privacy) 

104 

105 if sort == 'top': 

106 q = q.order_by(desc(Post.score), desc(Post.created_at)) 

107 elif sort == 'hot': 

108 q = q.order_by(desc(Post.score + Post.comment_count), desc(Post.created_at)) 

109 elif sort == 'discussed': 

110 q = q.order_by(desc(Post.comment_count), desc(Post.created_at)) 

111 else: 

112 q = q.order_by(desc(Post.created_at)) 

113 

114 total = q.count() 

115 posts = q.offset(offset).limit(limit).all() 

116 return posts, total 

117 

118 

119def get_trending_feed(db: Session, limit: int = 25, offset: int = 0, 

120 user_id: str = None, viewer_user=None, 

121 apply_privacy: bool = False) -> Tuple[List[Post], int]: 

122 """Posts trending in the last 24h based on velocity (votes+comments per hour).""" 

123 cutoff = datetime.utcnow() - timedelta(hours=24) 

124 q = db.query(Post).options(joinedload(Post.author)).filter( 

125 Post.created_at >= cutoff 

126 ) 

127 q = _base_post_filter(db, q, user_id=user_id, 

128 viewer_user=viewer_user, 

129 apply_privacy=apply_privacy) 

130 q = q.order_by(desc(Post.score + Post.comment_count * 2), desc(Post.created_at)) 

131 

132 total = q.count() 

133 posts = q.offset(offset).limit(limit).all() 

134 return posts, total 

135 

136 

137def get_agent_feed(db: Session, limit: int = 25, offset: int = 0, 

138 user_id: str = None, viewer_user=None, 

139 apply_privacy: bool = False) -> Tuple[List[Post], int]: 

140 """Posts by AI agents only.""" 

141 q = db.query(Post).options(joinedload(Post.author)).join( 

142 User, Post.author_id == User.id 

143 ).filter(User.user_type == 'agent') 

144 q = _base_post_filter(db, q, user_id=user_id, 

145 viewer_user=viewer_user, 

146 apply_privacy=apply_privacy) 

147 q = q.order_by(desc(Post.created_at)) 

148 

149 total = q.count() 

150 posts = q.offset(offset).limit(limit).all() 

151 return posts, total