Coverage for integrations / social / experiment_discovery_service.py: 33.1%

121 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Experiment Discovery Service — Interest-based recommendations and live metrics. 

3 

4Recommendation scoring: 

5 score = intent_match * 3.0 

6 + recency_decay (10 pts at 0h → 0 at 7d) 

7 + log(contributor_count + 1) * 2.0 

8 + log(total_votes + 1) * 1.5 

9 + log(funding_total + 1) * 0.5 

10 + bond_boost (5 if creator bonded) 

11 + status_weight 

12 

13Service Pattern: static methods, db: Session, db.flush() not db.commit(). 

14""" 

15import math 

16import logging 

17from datetime import datetime 

18from typing import Dict, List, Optional 

19 

20from sqlalchemy import desc, func, or_ 

21from sqlalchemy.orm import Session 

22 

23logger = logging.getLogger('hevolve_social') 

24 

25 

26class ExperimentDiscoveryService: 

27 

28 @staticmethod 

29 def discover(db: Session, user_id: str = None, 

30 intent_filter: str = None, 

31 experiment_type: str = None, 

32 status_filter: str = None, 

33 limit: int = 25, offset: int = 0) -> Dict: 

34 """Interest-based experiment discovery with personalised ranking.""" 

35 from .models import ThoughtExperiment, ExperimentVote, Post, Encounter 

36 

37 # 1. Build user interest profile from past votes + posts 

38 user_intents: Dict[str, int] = {} 

39 if user_id: 

40 vote_counts = db.query( 

41 ThoughtExperiment.intent_category, 

42 func.count(ExperimentVote.id) 

43 ).join( 

44 ExperimentVote, 

45 ExperimentVote.experiment_id == ThoughtExperiment.id 

46 ).filter( 

47 ExperimentVote.voter_id == user_id 

48 ).group_by(ThoughtExperiment.intent_category).all() 

49 

50 for cat, cnt in vote_counts: 

51 if cat: 

52 user_intents[cat] = cnt 

53 

54 post_counts = db.query( 

55 Post.intent_category, func.count(Post.id) 

56 ).filter( 

57 Post.author_id == user_id, 

58 Post.intent_category.isnot(None) 

59 ).group_by(Post.intent_category).all() 

60 

61 for cat, cnt in post_counts: 

62 if cat: 

63 user_intents[cat] = user_intents.get(cat, 0) + cnt 

64 

65 # 2. Query experiments (fetch more than needed for scoring) 

66 q = db.query(ThoughtExperiment).filter( 

67 ThoughtExperiment.status != 'archived' 

68 ) 

69 if intent_filter: 

70 q = q.filter(ThoughtExperiment.intent_category == intent_filter) 

71 if experiment_type: 

72 q = q.filter(ThoughtExperiment.experiment_type == experiment_type) 

73 if status_filter: 

74 q = q.filter(ThoughtExperiment.status == status_filter) 

75 

76 experiments = q.order_by(desc(ThoughtExperiment.created_at)).limit(200).all() 

77 

78 # 3. Get bonded user IDs for boost 

79 bond_user_ids: set = set() 

80 if user_id: 

81 bonds = db.query(Encounter).filter( 

82 or_(Encounter.user_a_id == user_id, 

83 Encounter.user_b_id == user_id), 

84 Encounter.bond_level >= 3 

85 ).all() 

86 for b in bonds: 

87 other = b.user_b_id if b.user_a_id == user_id else b.user_a_id 

88 bond_user_ids.add(other) 

89 

90 # 4. Score and rank 

91 now = datetime.utcnow() 

92 status_weights = { 

93 'voting': 3.0, 'discussing': 2.0, 'proposed': 1.0, 

94 'evaluating': 2.5, 'decided': 0.5, 

95 } 

96 

97 scored: List = [] 

98 for exp in experiments: 

99 score = 0.0 

100 

101 # Intent match 

102 if exp.intent_category and exp.intent_category in user_intents: 

103 score += 3.0 * math.log1p(user_intents[exp.intent_category]) 

104 

105 # Recency decay (10 points at 0h → 0 at 7 days) 

106 if exp.created_at: 

107 age_hours = (now - exp.created_at).total_seconds() / 3600 

108 score += max(0.0, 10.0 - (age_hours / 16.8)) 

109 

110 # Contributor popularity 

111 score += math.log1p(exp.contributor_count or 0) * 2.0 

112 

113 # Vote engagement 

114 score += math.log1p(exp.total_votes or 0) * 1.5 

115 

116 # Funding signal 

117 score += math.log1p(exp.funding_total or 0) * 0.5 

118 

119 # Bond boost 

120 if exp.creator_id in bond_user_ids: 

121 score += 5.0 

122 

123 # Active status boost 

124 score += status_weights.get(exp.status, 0.0) 

125 

126 scored.append((score, exp)) 

127 

128 scored.sort(key=lambda x: -x[0]) 

129 

130 # 5. Paginate 

131 page = scored[offset:offset + limit] 

132 

133 # 6. Enrich with post metrics 

134 results = [] 

135 for score_val, exp in page: 

136 d = exp.to_dict() 

137 d['discovery_score'] = round(score_val, 2) 

138 

139 if exp.post_id: 

140 post = db.query(Post).filter_by(id=exp.post_id).first() 

141 if post: 

142 d['view_count'] = post.view_count or 0 

143 d['comment_count'] = post.comment_count or 0 

144 d['upvotes'] = post.upvotes or 0 

145 d['downvotes'] = post.downvotes or 0 

146 if hasattr(post, 'author') and post.author: 

147 d['author'] = { 

148 'id': post.author.id, 

149 'username': post.author.username, 

150 'display_name': getattr(post.author, 'display_name', post.author.username), 

151 } 

152 results.append(d) 

153 

154 return { 

155 'experiments': results, 

156 'meta': { 

157 'total': len(scored), 

158 'limit': limit, 

159 'offset': offset, 

160 'has_more': offset + limit < len(scored), 

161 'user_intents': user_intents if user_id else {}, 

162 } 

163 } 

164 

165 @staticmethod 

166 def get_experiment_metrics(db: Session, experiment_id: str) -> Optional[Dict]: 

167 """Get live metrics for a specific experiment, varying by experiment_type.""" 

168 from .models import ThoughtExperiment, ExperimentVote, Post 

169 

170 exp = db.query(ThoughtExperiment).filter_by(id=experiment_id).first() 

171 if not exp: 

172 return None 

173 

174 metrics: Dict = { 

175 'experiment_id': experiment_id, 

176 'experiment_type': exp.experiment_type or 'traditional', 

177 'contributor_count': exp.contributor_count or 0, 

178 'funding_total': exp.funding_total or 0, 

179 'total_votes': exp.total_votes or 0, 

180 'status': exp.status, 

181 } 

182 

183 # Voter breakdown 

184 votes = db.query(ExperimentVote).filter_by(experiment_id=experiment_id).all() 

185 metrics['human_voters'] = sum(1 for v in votes if v.voter_type == 'human') 

186 metrics['agent_voters'] = sum(1 for v in votes if v.voter_type == 'agent') 

187 

188 # Vote distribution 

189 support = sum(1 for v in votes if v.vote_value > 0) 

190 oppose = sum(1 for v in votes if v.vote_value < 0) 

191 neutral = sum(1 for v in votes if v.vote_value == 0) 

192 metrics['vote_distribution'] = { 

193 'support': support, 'oppose': oppose, 'neutral': neutral, 

194 } 

195 

196 # Post engagement 

197 if exp.post_id: 

198 post = db.query(Post).filter_by(id=exp.post_id).first() 

199 if post: 

200 metrics['view_count'] = post.view_count or 0 

201 metrics['comment_count'] = post.comment_count or 0 

202 

203 # Type-specific metrics 

204 if exp.experiment_type == 'physical_ai': 

205 metrics['camera_feed_url'] = exp.camera_feed_url 

206 metrics['has_camera'] = bool(exp.camera_feed_url) 

207 

208 elif exp.experiment_type == 'software': 

209 metrics['build_stats'] = _get_build_stats(db, experiment_id) 

210 

211 # Compute contribution from hive nodes 

212 metrics.update(_get_compute_stats(db)) 

213 

214 return metrics 

215 

216 @staticmethod 

217 def record_contribution(db: Session, experiment_id: str, user_id: str, 

218 spark_amount: int = 0) -> Optional[Dict]: 

219 """Record a user contributing to / believing in an experiment.""" 

220 from .models import ThoughtExperiment 

221 

222 exp = db.query(ThoughtExperiment).filter_by(id=experiment_id).first() 

223 if not exp: 

224 return None 

225 

226 exp.contributor_count = (exp.contributor_count or 0) + 1 

227 if spark_amount > 0: 

228 exp.funding_total = (exp.funding_total or 0) + spark_amount 

229 

230 db.flush() 

231 return exp.to_dict() 

232 

233 

234# ─── Internal Helpers ─── 

235 

236def _get_build_stats(db: Session, experiment_id: str) -> Dict: 

237 """Get build success rates from CodingTask model linked via AgentGoal.""" 

238 try: 

239 from .models import CodingTask 

240 # CodingTasks linked to experiment via goal config 

241 # For now, aggregate all coding tasks (can filter by experiment later) 

242 tasks = db.query(CodingTask).filter( 

243 CodingTask.status.in_(['merged', 'failed', 'in_progress', 'review', 'assigned']) 

244 ).limit(100).all() 

245 

246 total = len(tasks) 

247 merged = sum(1 for t in tasks if t.status == 'merged') 

248 failed = sum(1 for t in tasks if t.status == 'failed') 

249 in_progress = sum(1 for t in tasks if t.status in ('assigned', 'in_progress')) 

250 in_review = sum(1 for t in tasks if t.status == 'review') 

251 

252 return { 

253 'total_tasks': total, 

254 'merged': merged, 

255 'failed': failed, 

256 'in_review': in_review, 

257 'in_progress': in_progress, 

258 'success_rate': round(merged / total, 3) if total else 0.0, 

259 } 

260 except Exception as e: 

261 logger.debug("Build stats unavailable: %s", e) 

262 return {} 

263 

264 

265def _get_compute_stats(db: Session) -> Dict: 

266 """Get hive compute stats from PeerNode + NodeComputeConfig.""" 

267 try: 

268 from .models import PeerNode, NodeComputeConfig 

269 nodes = db.query(PeerNode).join( 

270 NodeComputeConfig, NodeComputeConfig.node_id == PeerNode.node_id 

271 ).filter( 

272 NodeComputeConfig.accept_thought_experiments == True, # noqa: E712 

273 PeerNode.status == 'active', 

274 ).all() 

275 return { 

276 'compute_nodes': len(nodes), 

277 'total_gpu_hours': round(sum(n.gpu_hours_served or 0 for n in nodes), 1), 

278 'total_inferences': sum(n.total_inferences or 0 for n in nodes), 

279 } 

280 except Exception as e: 

281 logger.debug("Compute stats unavailable: %s", e) 

282 return {'compute_nodes': 0, 'total_gpu_hours': 0, 'total_inferences': 0}