Coverage for integrations / social / region_service.py: 26.1%

134 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Region Service 

3Regional governance, membership, auto-promotion, feeds, leaderboards. 

4""" 

5import logging 

6from datetime import datetime, timedelta 

7from typing import Optional, Dict, List 

8 

9from sqlalchemy import desc, func 

10from sqlalchemy.orm import Session 

11 

12from .models import ( 

13 User, Post, Region, RegionMembership, ResonanceWallet, 

14) 

15 

16logger = logging.getLogger('hevolve_social') 

17 

18# Role progression thresholds 

19ROLE_THRESHOLDS = { 

20 'contributor': {'signal': 1.0, 'posts': 5, 'days': 7}, 

21 'moderator': {'signal': 5.0, 'score': 100, 'days': 30}, 

22 'admin': {'signal': 15.0, 'score': 500, 'days': 90}, 

23 'steward': {'signal': 50.0, 'score': 2000, 'days': 180}, 

24} 

25 

26 

27class RegionService: 

28 

29 @staticmethod 

30 def create_region(db: Session, creator_id: str, name: str, 

31 display_name: str = '', description: str = '', 

32 region_type: str = 'thematic', 

33 lat: float = None, lon: float = None, 

34 radius_km: float = None) -> Dict: 

35 """Create a new region.""" 

36 region = Region( 

37 name=name, 

38 display_name=display_name or name, 

39 description=description, 

40 region_type=region_type, 

41 lat=lat, lon=lon, radius_km=radius_km, 

42 member_count=1, 

43 ) 

44 db.add(region) 

45 db.flush() 

46 

47 # Creator becomes steward 

48 membership = RegionMembership( 

49 user_id=creator_id, 

50 region_id=region.id, 

51 role='steward', 

52 contribution_score=0.0, 

53 ) 

54 db.add(membership) 

55 db.flush() 

56 

57 # Update user's region_id 

58 user = db.query(User).filter_by(id=creator_id).first() 

59 if user and not user.region_id: 

60 user.region_id = region.id 

61 

62 return region.to_dict() 

63 

64 @staticmethod 

65 def get_region(db: Session, region_id: str) -> Optional[Dict]: 

66 region = db.query(Region).filter_by(id=region_id).first() 

67 return region.to_dict() if region else None 

68 

69 @staticmethod 

70 def list_regions(db: Session, limit: int = 50, offset: int = 0, 

71 region_type: str = None) -> List[Dict]: 

72 q = db.query(Region) 

73 if region_type: 

74 q = q.filter_by(region_type=region_type) 

75 regions = q.order_by(desc(Region.member_count)).offset(offset).limit(limit).all() 

76 return [r.to_dict() for r in regions] 

77 

78 @staticmethod 

79 def join_region(db: Session, user_id: str, region_id: str) -> Dict: 

80 """Join a region as a member.""" 

81 existing = db.query(RegionMembership).filter_by( 

82 user_id=user_id, region_id=region_id 

83 ).first() 

84 if existing: 

85 return {'already_member': True, 'role': existing.role} 

86 

87 membership = RegionMembership( 

88 user_id=user_id, 

89 region_id=region_id, 

90 role='member', 

91 ) 

92 db.add(membership) 

93 

94 region = db.query(Region).filter_by(id=region_id).first() 

95 if region: 

96 region.member_count = (region.member_count or 0) + 1 

97 

98 # Set as user's primary region if they don't have one 

99 user = db.query(User).filter_by(id=user_id).first() 

100 if user and not user.region_id: 

101 user.region_id = region_id 

102 

103 db.flush() 

104 return {'joined': True, 'role': 'member'} 

105 

106 @staticmethod 

107 def leave_region(db: Session, user_id: str, region_id: str) -> bool: 

108 membership = db.query(RegionMembership).filter_by( 

109 user_id=user_id, region_id=region_id 

110 ).first() 

111 if not membership: 

112 return False 

113 

114 db.delete(membership) 

115 region = db.query(Region).filter_by(id=region_id).first() 

116 if region: 

117 region.member_count = max(0, (region.member_count or 0) - 1) 

118 

119 user = db.query(User).filter_by(id=user_id).first() 

120 if user and user.region_id == region_id: 

121 user.region_id = None 

122 

123 return True 

124 

125 @staticmethod 

126 def get_members(db: Session, region_id: str, 

127 limit: int = 50, offset: int = 0) -> List[Dict]: 

128 rows = db.query(RegionMembership, User).join( 

129 User, User.id == RegionMembership.user_id 

130 ).filter( 

131 RegionMembership.region_id == region_id 

132 ).order_by( 

133 desc(RegionMembership.contribution_score) 

134 ).offset(offset).limit(limit).all() 

135 

136 result = [] 

137 for membership, user in rows: 

138 result.append({ 

139 'user_id': user.id, 

140 'username': user.username, 

141 'display_name': user.display_name, 

142 'avatar_url': user.avatar_url, 

143 'role': membership.role, 

144 'contribution_score': membership.contribution_score, 

145 'promoted_at': membership.promoted_at.isoformat() if membership.promoted_at else None, 

146 }) 

147 return result 

148 

149 @staticmethod 

150 def get_regional_feed(db: Session, region_id: str, 

151 limit: int = 25, offset: int = 0) -> List[Dict]: 

152 """Get posts from a specific region.""" 

153 posts = db.query(Post).filter_by( 

154 region_id=region_id, is_removed=False 

155 ).order_by( 

156 desc(Post.created_at) 

157 ).offset(offset).limit(limit).all() 

158 return [p.to_dict(include_author=True) for p in posts] 

159 

160 @staticmethod 

161 def get_regional_leaderboard(db: Session, region_id: str, 

162 currency: str = 'pulse', 

163 limit: int = 50, offset: int = 0) -> List[Dict]: 

164 from .resonance_engine import ResonanceService 

165 return ResonanceService.get_leaderboard( 

166 db, currency=currency, limit=limit, offset=offset, region_id=region_id 

167 ) 

168 

169 @staticmethod 

170 def check_promotion_eligibility(db: Session, user_id: str, 

171 region_id: str) -> Dict: 

172 """Check if a user is eligible for promotion in a region.""" 

173 membership = db.query(RegionMembership).filter_by( 

174 user_id=user_id, region_id=region_id 

175 ).first() 

176 if not membership: 

177 return {'eligible': False, 'reason': 'Not a member'} 

178 

179 wallet = db.query(ResonanceWallet).filter_by(user_id=user_id).first() 

180 user = db.query(User).filter_by(id=user_id).first() 

181 

182 current_role = membership.role 

183 next_role_map = { 

184 'member': 'contributor', 

185 'contributor': 'moderator', 

186 'moderator': 'admin', 

187 'admin': 'steward', 

188 } 

189 next_role = next_role_map.get(current_role) 

190 if not next_role: 

191 return {'eligible': False, 'reason': 'Already at max role', 'role': current_role} 

192 

193 reqs = ROLE_THRESHOLDS.get(next_role, {}) 

194 days_member = (datetime.utcnow() - (membership.created_at or datetime.utcnow())).days 

195 

196 checks = { 

197 'signal': (wallet.signal if wallet else 0) >= reqs.get('signal', 0), 

198 'days': days_member >= reqs.get('days', 0), 

199 } 

200 if 'posts' in reqs: 

201 checks['posts'] = (user.post_count if user else 0) >= reqs['posts'] 

202 if 'score' in reqs: 

203 checks['score'] = (membership.contribution_score or 0) >= reqs['score'] 

204 

205 eligible = all(checks.values()) 

206 return { 

207 'eligible': eligible, 

208 'current_role': current_role, 

209 'next_role': next_role, 

210 'checks': checks, 

211 'requirements': reqs, 

212 } 

213 

214 @staticmethod 

215 def promote_member(db: Session, user_id: str, region_id: str, 

216 new_role: str, promoter_id: str = None) -> Optional[Dict]: 

217 """Promote a member to a higher role.""" 

218 membership = db.query(RegionMembership).filter_by( 

219 user_id=user_id, region_id=region_id 

220 ).first() 

221 if not membership: 

222 return None 

223 

224 role_order = ['member', 'contributor', 'moderator', 'admin', 'steward'] 

225 if new_role not in role_order: 

226 return None 

227 if role_order.index(new_role) <= role_order.index(membership.role): 

228 return None 

229 

230 membership.role = new_role 

231 membership.promoted_at = datetime.utcnow() 

232 return {'user_id': user_id, 'new_role': new_role} 

233 

234 @staticmethod 

235 def demote_member(db: Session, user_id: str, region_id: str, 

236 new_role: str) -> Optional[Dict]: 

237 """Demote a member to a lower role.""" 

238 membership = db.query(RegionMembership).filter_by( 

239 user_id=user_id, region_id=region_id 

240 ).first() 

241 if not membership: 

242 return None 

243 

244 role_order = ['member', 'contributor', 'moderator', 'admin', 'steward'] 

245 if new_role not in role_order: 

246 return None 

247 

248 membership.role = new_role 

249 return {'user_id': user_id, 'new_role': new_role} 

250 

251 @staticmethod 

252 def nearby_regions(db: Session, lat: float, lon: float, 

253 radius_km: float = 50.0) -> List[Dict]: 

254 """Find regions near a geographic point.""" 

255 deg = radius_km / 111.0 # rough degrees per km 

256 regions = db.query(Region).filter( 

257 Region.lat.isnot(None), 

258 Region.lon.isnot(None), 

259 Region.lat.between(lat - deg, lat + deg), 

260 Region.lon.between(lon - deg, lon + deg), 

261 ).all() 

262 return [r.to_dict() for r in regions] 

263 

264 @staticmethod 

265 def get_governance_info(db: Session, region_id: str) -> Dict: 

266 """Get governance information for a region.""" 

267 region = db.query(Region).filter_by(id=region_id).first() 

268 if not region: 

269 return {} 

270 

271 # Count by role 

272 role_counts = db.query( 

273 RegionMembership.role, func.count(RegionMembership.id) 

274 ).filter_by(region_id=region_id).group_by(RegionMembership.role).all() 

275 

276 roles = {role: count for role, count in role_counts} 

277 

278 # Get council (moderators and above) 

279 council = db.query(RegionMembership, User).join( 

280 User, User.id == RegionMembership.user_id 

281 ).filter( 

282 RegionMembership.region_id == region_id, 

283 RegionMembership.role.in_(['moderator', 'admin', 'steward']), 

284 ).all() 

285 

286 council_list = [{ 

287 'user_id': u.id, 

288 'username': u.username, 

289 'display_name': u.display_name, 

290 'role': m.role, 

291 'contribution_score': m.contribution_score, 

292 } for m, u in council] 

293 

294 return { 

295 'region_id': region_id, 

296 'member_count': region.member_count, 

297 'role_distribution': roles, 

298 'council': council_list, 

299 'thresholds': ROLE_THRESHOLDS, 

300 }