Coverage for integrations / social / experiment_discovery_service.py: 33.1%
121 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Experiment Discovery Service — Interest-based recommendations and live metrics.
4Recommendation scoring:
5 score = intent_match * 3.0
6 + recency_decay (10 pts at 0h → 0 at 7d)
7 + log(contributor_count + 1) * 2.0
8 + log(total_votes + 1) * 1.5
9 + log(funding_total + 1) * 0.5
10 + bond_boost (5 if creator bonded)
11 + status_weight
13Service Pattern: static methods, db: Session, db.flush() not db.commit().
14"""
15import math
16import logging
17from datetime import datetime
18from typing import Dict, List, Optional
20from sqlalchemy import desc, func, or_
21from sqlalchemy.orm import Session
23logger = logging.getLogger('hevolve_social')
26class ExperimentDiscoveryService:
28 @staticmethod
29 def discover(db: Session, user_id: str = None,
30 intent_filter: str = None,
31 experiment_type: str = None,
32 status_filter: str = None,
33 limit: int = 25, offset: int = 0) -> Dict:
34 """Interest-based experiment discovery with personalised ranking."""
35 from .models import ThoughtExperiment, ExperimentVote, Post, Encounter
37 # 1. Build user interest profile from past votes + posts
38 user_intents: Dict[str, int] = {}
39 if user_id:
40 vote_counts = db.query(
41 ThoughtExperiment.intent_category,
42 func.count(ExperimentVote.id)
43 ).join(
44 ExperimentVote,
45 ExperimentVote.experiment_id == ThoughtExperiment.id
46 ).filter(
47 ExperimentVote.voter_id == user_id
48 ).group_by(ThoughtExperiment.intent_category).all()
50 for cat, cnt in vote_counts:
51 if cat:
52 user_intents[cat] = cnt
54 post_counts = db.query(
55 Post.intent_category, func.count(Post.id)
56 ).filter(
57 Post.author_id == user_id,
58 Post.intent_category.isnot(None)
59 ).group_by(Post.intent_category).all()
61 for cat, cnt in post_counts:
62 if cat:
63 user_intents[cat] = user_intents.get(cat, 0) + cnt
65 # 2. Query experiments (fetch more than needed for scoring)
66 q = db.query(ThoughtExperiment).filter(
67 ThoughtExperiment.status != 'archived'
68 )
69 if intent_filter:
70 q = q.filter(ThoughtExperiment.intent_category == intent_filter)
71 if experiment_type:
72 q = q.filter(ThoughtExperiment.experiment_type == experiment_type)
73 if status_filter:
74 q = q.filter(ThoughtExperiment.status == status_filter)
76 experiments = q.order_by(desc(ThoughtExperiment.created_at)).limit(200).all()
78 # 3. Get bonded user IDs for boost
79 bond_user_ids: set = set()
80 if user_id:
81 bonds = db.query(Encounter).filter(
82 or_(Encounter.user_a_id == user_id,
83 Encounter.user_b_id == user_id),
84 Encounter.bond_level >= 3
85 ).all()
86 for b in bonds:
87 other = b.user_b_id if b.user_a_id == user_id else b.user_a_id
88 bond_user_ids.add(other)
90 # 4. Score and rank
91 now = datetime.utcnow()
92 status_weights = {
93 'voting': 3.0, 'discussing': 2.0, 'proposed': 1.0,
94 'evaluating': 2.5, 'decided': 0.5,
95 }
97 scored: List = []
98 for exp in experiments:
99 score = 0.0
101 # Intent match
102 if exp.intent_category and exp.intent_category in user_intents:
103 score += 3.0 * math.log1p(user_intents[exp.intent_category])
105 # Recency decay (10 points at 0h → 0 at 7 days)
106 if exp.created_at:
107 age_hours = (now - exp.created_at).total_seconds() / 3600
108 score += max(0.0, 10.0 - (age_hours / 16.8))
110 # Contributor popularity
111 score += math.log1p(exp.contributor_count or 0) * 2.0
113 # Vote engagement
114 score += math.log1p(exp.total_votes or 0) * 1.5
116 # Funding signal
117 score += math.log1p(exp.funding_total or 0) * 0.5
119 # Bond boost
120 if exp.creator_id in bond_user_ids:
121 score += 5.0
123 # Active status boost
124 score += status_weights.get(exp.status, 0.0)
126 scored.append((score, exp))
128 scored.sort(key=lambda x: -x[0])
130 # 5. Paginate
131 page = scored[offset:offset + limit]
133 # 6. Enrich with post metrics
134 results = []
135 for score_val, exp in page:
136 d = exp.to_dict()
137 d['discovery_score'] = round(score_val, 2)
139 if exp.post_id:
140 post = db.query(Post).filter_by(id=exp.post_id).first()
141 if post:
142 d['view_count'] = post.view_count or 0
143 d['comment_count'] = post.comment_count or 0
144 d['upvotes'] = post.upvotes or 0
145 d['downvotes'] = post.downvotes or 0
146 if hasattr(post, 'author') and post.author:
147 d['author'] = {
148 'id': post.author.id,
149 'username': post.author.username,
150 'display_name': getattr(post.author, 'display_name', post.author.username),
151 }
152 results.append(d)
154 return {
155 'experiments': results,
156 'meta': {
157 'total': len(scored),
158 'limit': limit,
159 'offset': offset,
160 'has_more': offset + limit < len(scored),
161 'user_intents': user_intents if user_id else {},
162 }
163 }
165 @staticmethod
166 def get_experiment_metrics(db: Session, experiment_id: str) -> Optional[Dict]:
167 """Get live metrics for a specific experiment, varying by experiment_type."""
168 from .models import ThoughtExperiment, ExperimentVote, Post
170 exp = db.query(ThoughtExperiment).filter_by(id=experiment_id).first()
171 if not exp:
172 return None
174 metrics: Dict = {
175 'experiment_id': experiment_id,
176 'experiment_type': exp.experiment_type or 'traditional',
177 'contributor_count': exp.contributor_count or 0,
178 'funding_total': exp.funding_total or 0,
179 'total_votes': exp.total_votes or 0,
180 'status': exp.status,
181 }
183 # Voter breakdown
184 votes = db.query(ExperimentVote).filter_by(experiment_id=experiment_id).all()
185 metrics['human_voters'] = sum(1 for v in votes if v.voter_type == 'human')
186 metrics['agent_voters'] = sum(1 for v in votes if v.voter_type == 'agent')
188 # Vote distribution
189 support = sum(1 for v in votes if v.vote_value > 0)
190 oppose = sum(1 for v in votes if v.vote_value < 0)
191 neutral = sum(1 for v in votes if v.vote_value == 0)
192 metrics['vote_distribution'] = {
193 'support': support, 'oppose': oppose, 'neutral': neutral,
194 }
196 # Post engagement
197 if exp.post_id:
198 post = db.query(Post).filter_by(id=exp.post_id).first()
199 if post:
200 metrics['view_count'] = post.view_count or 0
201 metrics['comment_count'] = post.comment_count or 0
203 # Type-specific metrics
204 if exp.experiment_type == 'physical_ai':
205 metrics['camera_feed_url'] = exp.camera_feed_url
206 metrics['has_camera'] = bool(exp.camera_feed_url)
208 elif exp.experiment_type == 'software':
209 metrics['build_stats'] = _get_build_stats(db, experiment_id)
211 # Compute contribution from hive nodes
212 metrics.update(_get_compute_stats(db))
214 return metrics
216 @staticmethod
217 def record_contribution(db: Session, experiment_id: str, user_id: str,
218 spark_amount: int = 0) -> Optional[Dict]:
219 """Record a user contributing to / believing in an experiment."""
220 from .models import ThoughtExperiment
222 exp = db.query(ThoughtExperiment).filter_by(id=experiment_id).first()
223 if not exp:
224 return None
226 exp.contributor_count = (exp.contributor_count or 0) + 1
227 if spark_amount > 0:
228 exp.funding_total = (exp.funding_total or 0) + spark_amount
230 db.flush()
231 return exp.to_dict()
234# ─── Internal Helpers ───
236def _get_build_stats(db: Session, experiment_id: str) -> Dict:
237 """Get build success rates from CodingTask model linked via AgentGoal."""
238 try:
239 from .models import CodingTask
240 # CodingTasks linked to experiment via goal config
241 # For now, aggregate all coding tasks (can filter by experiment later)
242 tasks = db.query(CodingTask).filter(
243 CodingTask.status.in_(['merged', 'failed', 'in_progress', 'review', 'assigned'])
244 ).limit(100).all()
246 total = len(tasks)
247 merged = sum(1 for t in tasks if t.status == 'merged')
248 failed = sum(1 for t in tasks if t.status == 'failed')
249 in_progress = sum(1 for t in tasks if t.status in ('assigned', 'in_progress'))
250 in_review = sum(1 for t in tasks if t.status == 'review')
252 return {
253 'total_tasks': total,
254 'merged': merged,
255 'failed': failed,
256 'in_review': in_review,
257 'in_progress': in_progress,
258 'success_rate': round(merged / total, 3) if total else 0.0,
259 }
260 except Exception as e:
261 logger.debug("Build stats unavailable: %s", e)
262 return {}
265def _get_compute_stats(db: Session) -> Dict:
266 """Get hive compute stats from PeerNode + NodeComputeConfig."""
267 try:
268 from .models import PeerNode, NodeComputeConfig
269 nodes = db.query(PeerNode).join(
270 NodeComputeConfig, NodeComputeConfig.node_id == PeerNode.node_id
271 ).filter(
272 NodeComputeConfig.accept_thought_experiments == True, # noqa: E712
273 PeerNode.status == 'active',
274 ).all()
275 return {
276 'compute_nodes': len(nodes),
277 'total_gpu_hours': round(sum(n.gpu_hours_served or 0 for n in nodes), 1),
278 'total_inferences': sum(n.total_inferences or 0 for n in nodes),
279 }
280 except Exception as e:
281 logger.debug("Compute stats unavailable: %s", e)
282 return {'compute_nodes': 0, 'total_gpu_hours': 0, 'total_inferences': 0}