Coverage for integrations / social / region_service.py: 26.1%
134 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Region Service
3Regional governance, membership, auto-promotion, feeds, leaderboards.
4"""
5import logging
6from datetime import datetime, timedelta
7from typing import Optional, Dict, List
9from sqlalchemy import desc, func
10from sqlalchemy.orm import Session
12from .models import (
13 User, Post, Region, RegionMembership, ResonanceWallet,
14)
16logger = logging.getLogger('hevolve_social')
18# Role progression thresholds
19ROLE_THRESHOLDS = {
20 'contributor': {'signal': 1.0, 'posts': 5, 'days': 7},
21 'moderator': {'signal': 5.0, 'score': 100, 'days': 30},
22 'admin': {'signal': 15.0, 'score': 500, 'days': 90},
23 'steward': {'signal': 50.0, 'score': 2000, 'days': 180},
24}
27class RegionService:
29 @staticmethod
30 def create_region(db: Session, creator_id: str, name: str,
31 display_name: str = '', description: str = '',
32 region_type: str = 'thematic',
33 lat: float = None, lon: float = None,
34 radius_km: float = None) -> Dict:
35 """Create a new region."""
36 region = Region(
37 name=name,
38 display_name=display_name or name,
39 description=description,
40 region_type=region_type,
41 lat=lat, lon=lon, radius_km=radius_km,
42 member_count=1,
43 )
44 db.add(region)
45 db.flush()
47 # Creator becomes steward
48 membership = RegionMembership(
49 user_id=creator_id,
50 region_id=region.id,
51 role='steward',
52 contribution_score=0.0,
53 )
54 db.add(membership)
55 db.flush()
57 # Update user's region_id
58 user = db.query(User).filter_by(id=creator_id).first()
59 if user and not user.region_id:
60 user.region_id = region.id
62 return region.to_dict()
64 @staticmethod
65 def get_region(db: Session, region_id: str) -> Optional[Dict]:
66 region = db.query(Region).filter_by(id=region_id).first()
67 return region.to_dict() if region else None
69 @staticmethod
70 def list_regions(db: Session, limit: int = 50, offset: int = 0,
71 region_type: str = None) -> List[Dict]:
72 q = db.query(Region)
73 if region_type:
74 q = q.filter_by(region_type=region_type)
75 regions = q.order_by(desc(Region.member_count)).offset(offset).limit(limit).all()
76 return [r.to_dict() for r in regions]
78 @staticmethod
79 def join_region(db: Session, user_id: str, region_id: str) -> Dict:
80 """Join a region as a member."""
81 existing = db.query(RegionMembership).filter_by(
82 user_id=user_id, region_id=region_id
83 ).first()
84 if existing:
85 return {'already_member': True, 'role': existing.role}
87 membership = RegionMembership(
88 user_id=user_id,
89 region_id=region_id,
90 role='member',
91 )
92 db.add(membership)
94 region = db.query(Region).filter_by(id=region_id).first()
95 if region:
96 region.member_count = (region.member_count or 0) + 1
98 # Set as user's primary region if they don't have one
99 user = db.query(User).filter_by(id=user_id).first()
100 if user and not user.region_id:
101 user.region_id = region_id
103 db.flush()
104 return {'joined': True, 'role': 'member'}
106 @staticmethod
107 def leave_region(db: Session, user_id: str, region_id: str) -> bool:
108 membership = db.query(RegionMembership).filter_by(
109 user_id=user_id, region_id=region_id
110 ).first()
111 if not membership:
112 return False
114 db.delete(membership)
115 region = db.query(Region).filter_by(id=region_id).first()
116 if region:
117 region.member_count = max(0, (region.member_count or 0) - 1)
119 user = db.query(User).filter_by(id=user_id).first()
120 if user and user.region_id == region_id:
121 user.region_id = None
123 return True
125 @staticmethod
126 def get_members(db: Session, region_id: str,
127 limit: int = 50, offset: int = 0) -> List[Dict]:
128 rows = db.query(RegionMembership, User).join(
129 User, User.id == RegionMembership.user_id
130 ).filter(
131 RegionMembership.region_id == region_id
132 ).order_by(
133 desc(RegionMembership.contribution_score)
134 ).offset(offset).limit(limit).all()
136 result = []
137 for membership, user in rows:
138 result.append({
139 'user_id': user.id,
140 'username': user.username,
141 'display_name': user.display_name,
142 'avatar_url': user.avatar_url,
143 'role': membership.role,
144 'contribution_score': membership.contribution_score,
145 'promoted_at': membership.promoted_at.isoformat() if membership.promoted_at else None,
146 })
147 return result
149 @staticmethod
150 def get_regional_feed(db: Session, region_id: str,
151 limit: int = 25, offset: int = 0) -> List[Dict]:
152 """Get posts from a specific region."""
153 posts = db.query(Post).filter_by(
154 region_id=region_id, is_removed=False
155 ).order_by(
156 desc(Post.created_at)
157 ).offset(offset).limit(limit).all()
158 return [p.to_dict(include_author=True) for p in posts]
160 @staticmethod
161 def get_regional_leaderboard(db: Session, region_id: str,
162 currency: str = 'pulse',
163 limit: int = 50, offset: int = 0) -> List[Dict]:
164 from .resonance_engine import ResonanceService
165 return ResonanceService.get_leaderboard(
166 db, currency=currency, limit=limit, offset=offset, region_id=region_id
167 )
169 @staticmethod
170 def check_promotion_eligibility(db: Session, user_id: str,
171 region_id: str) -> Dict:
172 """Check if a user is eligible for promotion in a region."""
173 membership = db.query(RegionMembership).filter_by(
174 user_id=user_id, region_id=region_id
175 ).first()
176 if not membership:
177 return {'eligible': False, 'reason': 'Not a member'}
179 wallet = db.query(ResonanceWallet).filter_by(user_id=user_id).first()
180 user = db.query(User).filter_by(id=user_id).first()
182 current_role = membership.role
183 next_role_map = {
184 'member': 'contributor',
185 'contributor': 'moderator',
186 'moderator': 'admin',
187 'admin': 'steward',
188 }
189 next_role = next_role_map.get(current_role)
190 if not next_role:
191 return {'eligible': False, 'reason': 'Already at max role', 'role': current_role}
193 reqs = ROLE_THRESHOLDS.get(next_role, {})
194 days_member = (datetime.utcnow() - (membership.created_at or datetime.utcnow())).days
196 checks = {
197 'signal': (wallet.signal if wallet else 0) >= reqs.get('signal', 0),
198 'days': days_member >= reqs.get('days', 0),
199 }
200 if 'posts' in reqs:
201 checks['posts'] = (user.post_count if user else 0) >= reqs['posts']
202 if 'score' in reqs:
203 checks['score'] = (membership.contribution_score or 0) >= reqs['score']
205 eligible = all(checks.values())
206 return {
207 'eligible': eligible,
208 'current_role': current_role,
209 'next_role': next_role,
210 'checks': checks,
211 'requirements': reqs,
212 }
214 @staticmethod
215 def promote_member(db: Session, user_id: str, region_id: str,
216 new_role: str, promoter_id: str = None) -> Optional[Dict]:
217 """Promote a member to a higher role."""
218 membership = db.query(RegionMembership).filter_by(
219 user_id=user_id, region_id=region_id
220 ).first()
221 if not membership:
222 return None
224 role_order = ['member', 'contributor', 'moderator', 'admin', 'steward']
225 if new_role not in role_order:
226 return None
227 if role_order.index(new_role) <= role_order.index(membership.role):
228 return None
230 membership.role = new_role
231 membership.promoted_at = datetime.utcnow()
232 return {'user_id': user_id, 'new_role': new_role}
234 @staticmethod
235 def demote_member(db: Session, user_id: str, region_id: str,
236 new_role: str) -> Optional[Dict]:
237 """Demote a member to a lower role."""
238 membership = db.query(RegionMembership).filter_by(
239 user_id=user_id, region_id=region_id
240 ).first()
241 if not membership:
242 return None
244 role_order = ['member', 'contributor', 'moderator', 'admin', 'steward']
245 if new_role not in role_order:
246 return None
248 membership.role = new_role
249 return {'user_id': user_id, 'new_role': new_role}
251 @staticmethod
252 def nearby_regions(db: Session, lat: float, lon: float,
253 radius_km: float = 50.0) -> List[Dict]:
254 """Find regions near a geographic point."""
255 deg = radius_km / 111.0 # rough degrees per km
256 regions = db.query(Region).filter(
257 Region.lat.isnot(None),
258 Region.lon.isnot(None),
259 Region.lat.between(lat - deg, lat + deg),
260 Region.lon.between(lon - deg, lon + deg),
261 ).all()
262 return [r.to_dict() for r in regions]
264 @staticmethod
265 def get_governance_info(db: Session, region_id: str) -> Dict:
266 """Get governance information for a region."""
267 region = db.query(Region).filter_by(id=region_id).first()
268 if not region:
269 return {}
271 # Count by role
272 role_counts = db.query(
273 RegionMembership.role, func.count(RegionMembership.id)
274 ).filter_by(region_id=region_id).group_by(RegionMembership.role).all()
276 roles = {role: count for role, count in role_counts}
278 # Get council (moderators and above)
279 council = db.query(RegionMembership, User).join(
280 User, User.id == RegionMembership.user_id
281 ).filter(
282 RegionMembership.region_id == region_id,
283 RegionMembership.role.in_(['moderator', 'admin', 'steward']),
284 ).all()
286 council_list = [{
287 'user_id': u.id,
288 'username': u.username,
289 'display_name': u.display_name,
290 'role': m.role,
291 'contribution_score': m.contribution_score,
292 } for m, u in council]
294 return {
295 'region_id': region_id,
296 'member_count': region.member_count,
297 'role_distribution': roles,
298 'council': council_list,
299 'thresholds': ROLE_THRESHOLDS,
300 }