Coverage for integrations / social / services.py: 64.4%

447 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Service Layer 

3All business logic for posts, comments, votes, users, communities, follows, notifications. 

4""" 

5import re 

6import uuid 

7import logging 

8import threading 

9from datetime import datetime, timedelta 

10from typing import Optional, List, Tuple 

11 

12logger = logging.getLogger('hevolve_social') 

13 

14from sqlalchemy import desc, asc, func, event 

15from sqlalchemy.orm import Session, joinedload 

16 

17from .models import ( 

18 User, Post, Comment, Vote, Follow, Community, CommunityMembership, 

19 Notification, Report, TaskRequest, RecipeShare, AgentSkillBadge 

20) 

21from .auth import hash_password, verify_password, generate_api_token, generate_jwt 

22 

23 

24def _uuid(): 

25 return str(uuid.uuid4()) 

26 

27 

28# ─── User Service ─── 

29 

30class UserService: 

31 

32 @staticmethod 

33 def register(db: Session, username: str, password: str, email: str = None, 

34 display_name: str = None, user_type: str = 'human') -> User: 

35 if db.query(User).filter(User.username == username).first(): 

36 raise ValueError("Registration failed - username or email may already be in use") 

37 if email: 

38 # Basic email format validation 

39 if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email): 

40 raise ValueError("Invalid email address format") 

41 if db.query(User).filter(User.email == email).first(): 

42 raise ValueError("Registration failed - username or email may already be in use") 

43 

44 user = User( 

45 id=_uuid(), username=username, display_name=display_name or username, 

46 email=email, password_hash=hash_password(password), 

47 user_type=user_type, api_token=generate_api_token(), 

48 is_verified=True, 

49 ) 

50 db.add(user) 

51 db.flush() 

52 return user 

53 

54 @staticmethod 

55 def register_agent(db: Session, name: str, description: str = '', 

56 agent_id: str = None, owner_id: str = None, 

57 skip_name_validation: bool = False) -> User: 

58 name = name.strip().lower() 

59 

60 # Validate 3-word name format (skip for legacy/internal callers) 

61 if not skip_name_validation: 

62 from .agent_naming import validate_agent_name 

63 valid, error = validate_agent_name(name) 

64 if not valid: 

65 raise ValueError(error) 

66 

67 existing = db.query(User).filter(User.username == name).first() 

68 if existing: 

69 raise ValueError(f"Name '{name}' is already taken globally") 

70 

71 user = User( 

72 id=_uuid(), username=name, display_name=name, 

73 bio=description, user_type='agent', agent_id=agent_id, 

74 owner_id=owner_id, 

75 api_token=generate_api_token(), is_verified=True, 

76 ) 

77 db.add(user) 

78 db.flush() 

79 return user 

80 

81 @staticmethod 

82 def get_owned_agents(db: Session, owner_id: str) -> list: 

83 """Get all agents owned by a user.""" 

84 return db.query(User).filter( 

85 User.owner_id == owner_id, 

86 User.user_type == 'agent', 

87 ).all() 

88 

89 # Account lockout: track failed attempts per username 

90 _login_attempts = {} # username -> (count, first_attempt_at) 

91 _login_lock = threading.Lock() 

92 _MAX_ATTEMPTS = 5 

93 _LOCKOUT_MINUTES = 15 

94 _LOGIN_ATTEMPT_TTL = timedelta(minutes=30) 

95 

96 @staticmethod 

97 def _cleanup_login_attempts(): 

98 """Remove expired login attempt entries (called under _login_lock).""" 

99 now = datetime.utcnow() 

100 expired = [k for k, (_, first_at) in UserService._login_attempts.items() 

101 if now - first_at > UserService._LOGIN_ATTEMPT_TTL] 

102 for k in expired: 

103 del UserService._login_attempts[k] 

104 

105 @staticmethod 

106 def login(db: Session, username: str, password: str) -> Tuple[User, str]: 

107 # Periodically purge stale login attempt records 

108 with UserService._login_lock: 

109 UserService._cleanup_login_attempts() 

110 # Check lockout 

111 with UserService._login_lock: 

112 entry = UserService._login_attempts.get(username) 

113 if entry: 

114 count, first_at = entry 

115 elapsed = (datetime.utcnow() - first_at).total_seconds() 

116 if count >= UserService._MAX_ATTEMPTS and elapsed < UserService._LOCKOUT_MINUTES * 60: 

117 remaining = int(UserService._LOCKOUT_MINUTES - elapsed / 60) 

118 raise ValueError(f"Account temporarily locked. Try again in {remaining} minutes") 

119 if elapsed >= UserService._LOCKOUT_MINUTES * 60: 

120 del UserService._login_attempts[username] 

121 

122 user = db.query(User).filter(User.username == username).first() 

123 # Always run password verification to prevent timing-based user enumeration 

124 _dummy_hash = "0" * 32 + ":" + "0" * 64 

125 if not verify_password(password, user.password_hash if user else _dummy_hash): 

126 # Record failed attempt 

127 with UserService._login_lock: 

128 entry = UserService._login_attempts.get(username) 

129 if entry: 

130 UserService._login_attempts[username] = (entry[0] + 1, entry[1]) 

131 else: 

132 UserService._login_attempts[username] = (1, datetime.utcnow()) 

133 raise ValueError("Invalid username or password") 

134 if not user: 

135 raise ValueError("Invalid username or password") 

136 if user.is_banned: 

137 raise ValueError("Account is banned") 

138 # Clear lockout on successful login 

139 with UserService._login_lock: 

140 UserService._login_attempts.pop(username, None) 

141 token = generate_jwt(user.id, user.username, getattr(user, 'role', None) or 'flat') 

142 user.last_active_at = datetime.utcnow() 

143 db.flush() 

144 return user, token 

145 

146 @staticmethod 

147 def set_user_role(db: Session, user, role: str): 

148 """Set user role and sync legacy boolean flags.""" 

149 user.role = role 

150 user.is_admin = (role == 'central') 

151 user.is_moderator = (role in ('central', 'regional')) 

152 db.flush() 

153 

154 @staticmethod 

155 def get_by_id(db: Session, user_id: str) -> Optional[User]: 

156 return db.query(User).filter(User.id == user_id).first() 

157 

158 @staticmethod 

159 def get_by_username(db: Session, username: str) -> Optional[User]: 

160 return db.query(User).filter(User.username == username).first() 

161 

162 @staticmethod 

163 def list_users(db: Session, user_type: str = None, limit: int = 25, 

164 offset: int = 0) -> Tuple[List[User], int]: 

165 q = db.query(User).filter(User.is_banned == False) 

166 if user_type: 

167 q = q.filter(User.user_type == user_type) 

168 total = q.count() 

169 users = q.order_by(desc(User.karma_score)).offset(offset).limit(limit).all() 

170 return users, total 

171 

172 @staticmethod 

173 def set_handle(db: Session, user: User, handle: str) -> User: 

174 """Set a user's unique creator handle (used as suffix in global agent names).""" 

175 from .agent_naming import validate_handle, is_handle_available 

176 handle = handle.strip().lower() 

177 valid, error = validate_handle(handle) 

178 if not valid: 

179 raise ValueError(error) 

180 if not is_handle_available(db, handle): 

181 raise ValueError(f"Handle '{handle}' is already taken") 

182 user.handle = handle 

183 user.updated_at = datetime.utcnow() 

184 db.flush() 

185 return user 

186 

187 @staticmethod 

188 def register_agent_local(db: Session, local_name: str, description: str = '', 

189 agent_id: str = None, owner: User = None) -> User: 

190 """ 

191 Register an agent using a 2-word local name + owner's handle. 

192 The global username becomes: {local_name}.{handle} (e.g. swift.falcon.sathi). 

193 """ 

194 from .agent_naming import (validate_local_name, compose_global_name, 

195 check_global_availability) 

196 

197 if not owner: 

198 raise ValueError("Owner is required") 

199 if not owner.handle: 

200 raise ValueError("You need to set a handle before creating agents") 

201 

202 local_name = local_name.strip().lower() 

203 valid, error = validate_local_name(local_name) 

204 if not valid: 

205 raise ValueError(error) 

206 

207 # Check local uniqueness (same owner can't have two agents with same local name) 

208 existing_local = db.query(User).filter( 

209 User.owner_id == owner.id, 

210 User.local_name == local_name, 

211 ).first() 

212 if existing_local: 

213 raise ValueError(f"You already have an agent named '{local_name}'") 

214 

215 # Check global availability 

216 available, global_name, err = check_global_availability(db, local_name, owner.handle) 

217 if not available: 

218 raise ValueError( 

219 f"'{global_name}' is already taken globally. " 

220 f"Please choose a different name for your agent." 

221 ) 

222 

223 user = User( 

224 id=_uuid(), username=global_name, display_name=local_name, 

225 local_name=local_name, bio=description, user_type='agent', 

226 agent_id=agent_id, owner_id=owner.id, 

227 api_token=generate_api_token(), is_verified=True, 

228 ) 

229 db.add(user) 

230 db.flush() 

231 return user 

232 

233 @staticmethod 

234 def update_profile(db: Session, user: User, display_name: str = None, 

235 bio: str = None, avatar_url: str = None, 

236 handle: str = None) -> User: 

237 if display_name is not None: 

238 user.display_name = display_name 

239 if bio is not None: 

240 user.bio = bio 

241 if avatar_url is not None: 

242 user.avatar_url = avatar_url 

243 if handle is not None and user.user_type == 'human': 

244 UserService.set_handle(db, user, handle) 

245 user.updated_at = datetime.utcnow() 

246 db.flush() 

247 return user 

248 

249 

250# ─── Post Service ─── 

251 

252class PostService: 

253 

254 @staticmethod 

255 def create(db: Session, author: User, title: str, content: str = '', 

256 content_type: str = 'text', community_name: str = None, 

257 code_language: str = None, media_urls: list = None, 

258 link_url: str = None, source_channel: str = None, 

259 source_message_id: str = None, 

260 intent_category: str = None, hypothesis: str = None, 

261 expected_outcome: str = None, is_thought_experiment: bool = False, 

262 dynamic_layout: dict = None) -> Post: 

263 community_id = None 

264 if community_name: 

265 community = db.query(Community).filter(Community.name == community_name).first() 

266 if community: 

267 community_id = community.id 

268 community.post_count += 1 

269 

270 post = Post( 

271 id=_uuid(), author_id=author.id, community_id=community_id, 

272 title=title, content=content, content_type=content_type, 

273 code_language=code_language, media_urls=media_urls or [], 

274 link_url=link_url, source_channel=source_channel, 

275 source_message_id=source_message_id, 

276 intent_category=intent_category, hypothesis=hypothesis, 

277 expected_outcome=expected_outcome, 

278 is_thought_experiment=is_thought_experiment or False, 

279 dynamic_layout=dynamic_layout, 

280 ) 

281 db.add(post) 

282 author.post_count += 1 

283 db.flush() 

284 

285 # Resonance: award Spark + Signal + XP for creating a post 

286 try: 

287 from .resonance_engine import ResonanceService 

288 ResonanceService.award_action(db, author.id, 'create_post', post.id) 

289 except Exception: 

290 pass 

291 

292 try: 

293 from .gamification_service import GamificationService 

294 GamificationService.check_achievements(db, author.id) 

295 except Exception: 

296 pass 

297 

298 try: 

299 from .onboarding_service import OnboardingService 

300 OnboardingService.auto_advance(db, author.id, 'post') 

301 except Exception: 

302 pass 

303 

304 return post 

305 

306 @staticmethod 

307 def get_by_id(db: Session, post_id: str) -> Optional[Post]: 

308 return db.query(Post).options( 

309 joinedload(Post.author) 

310 ).filter(Post.id == post_id, Post.is_deleted == False).first() 

311 

312 @staticmethod 

313 def list_posts(db: Session, sort: str = 'new', community_name: str = None, 

314 author_id: str = None, limit: int = 25, offset: int = 0, 

315 viewer_user=None, apply_privacy: bool = False 

316 ) -> Tuple[List[Post], int]: 

317 """List posts. When `apply_privacy` is True, the privacy gate 

318 from integrations.social.privacy is AND'd into the query. 

319 

320 `apply_privacy` is opt-in (default False) so callers in flag-off 

321 deploys never load the privacy module or run the EXISTS 

322 subqueries. api.py reads g.feature_flags['post_privacy'] and 

323 passes it through. 

324 """ 

325 q = db.query(Post).options(joinedload(Post.author)).filter( 

326 Post.is_deleted == False, Post.is_hidden == False 

327 ) 

328 if community_name: 

329 community = db.query(Community).filter(Community.name == community_name).first() 

330 if community: 

331 q = q.filter(Post.community_id == community.id) 

332 if author_id: 

333 q = q.filter(Post.author_id == author_id) 

334 

335 if apply_privacy: 

336 from .privacy import visible_posts_filter 

337 q = q.filter(visible_posts_filter(viewer_user)) 

338 

339 if sort == 'top': 

340 q = q.order_by(desc(Post.score), desc(Post.created_at)) 

341 elif sort == 'hot': 

342 # Hot = score weighted by recency 

343 q = q.order_by(desc(Post.score + Post.comment_count), desc(Post.created_at)) 

344 elif sort == 'discussed': 

345 q = q.order_by(desc(Post.comment_count), desc(Post.created_at)) 

346 else: # 'new' 

347 q = q.order_by(desc(Post.created_at)) 

348 

349 total = q.count() 

350 posts = q.offset(offset).limit(limit).all() 

351 return posts, total 

352 

353 @staticmethod 

354 def update(db: Session, post: Post, title: str = None, content: str = None, 

355 intent_category: str = None, hypothesis: str = None, 

356 expected_outcome: str = None, is_thought_experiment: bool = None, 

357 dynamic_layout: dict = None) -> Post: 

358 if title is not None: 

359 post.title = title 

360 if content is not None: 

361 post.content = content 

362 if intent_category is not None: 

363 post.intent_category = intent_category 

364 if hypothesis is not None: 

365 post.hypothesis = hypothesis 

366 if expected_outcome is not None: 

367 post.expected_outcome = expected_outcome 

368 if is_thought_experiment is not None: 

369 post.is_thought_experiment = is_thought_experiment 

370 if dynamic_layout is not None: 

371 post.dynamic_layout = dynamic_layout 

372 post.updated_at = datetime.utcnow() 

373 db.flush() 

374 return post 

375 

376 @staticmethod 

377 def delete(db: Session, post: Post): 

378 post.is_deleted = True 

379 post.author.post_count = max(0, post.author.post_count - 1) 

380 if post.community_id: 

381 community = db.query(Community).filter_by(id=post.community_id).first() 

382 if community: 

383 community.post_count = max(0, (community.post_count or 0) - 1) 

384 db.flush() 

385 

386 @staticmethod 

387 def increment_view(db: Session, post: Post): 

388 post.view_count += 1 

389 db.flush() 

390 

391 

392# ─── Comment Service ─── 

393 

394class CommentService: 

395 

396 @staticmethod 

397 def create(db: Session, post: Post, author: User, content: str, 

398 parent_id: str = None) -> Comment: 

399 depth = 0 

400 if parent_id: 

401 parent = db.query(Comment).filter(Comment.id == parent_id).first() 

402 if parent: 

403 depth = parent.depth + 1 

404 

405 comment = Comment( 

406 id=_uuid(), post_id=post.id, author_id=author.id, 

407 parent_id=parent_id, content=content, depth=depth, 

408 ) 

409 db.add(comment) 

410 post.comment_count += 1 

411 author.comment_count += 1 

412 db.flush() 

413 

414 # Notify post author 

415 if post.author_id != author.id: 

416 NotificationService.create( 

417 db, post.author_id, 'comment', author.id, 'post', post.id, 

418 f"{author.display_name} commented on your post" 

419 ) 

420 # Notify parent comment author 

421 if parent_id: 

422 parent = db.query(Comment).filter(Comment.id == parent_id).first() 

423 if parent and parent.author_id != author.id: 

424 NotificationService.create( 

425 db, parent.author_id, 'reply', author.id, 'comment', parent.id, 

426 f"{author.display_name} replied to your comment" 

427 ) 

428 

429 # Resonance: award Spark + Signal + XP for creating a comment 

430 try: 

431 from .resonance_engine import ResonanceService 

432 ResonanceService.award_action(db, author.id, 'create_comment', comment.id) 

433 except Exception: 

434 pass 

435 

436 try: 

437 from .gamification_service import GamificationService 

438 GamificationService.check_achievements(db, author.id) 

439 except Exception: 

440 pass 

441 

442 try: 

443 from .onboarding_service import OnboardingService 

444 OnboardingService.auto_advance(db, author.id, 'comment') 

445 except Exception: 

446 pass 

447 

448 try: 

449 from .encounter_service import EncounterService 

450 # Record encounter between commenter and post author 

451 if post and post.author_id and post.author_id != author.id: 

452 EncounterService.record_encounter( 

453 db, author.id, post.author_id, 

454 'post', post.id) 

455 except Exception: 

456 pass 

457 

458 return comment 

459 

460 @staticmethod 

461 def get_by_post(db: Session, post_id: str, sort: str = 'new' 

462 ) -> List[Comment]: 

463 q = db.query(Comment).options(joinedload(Comment.author)).filter( 

464 Comment.post_id == post_id, Comment.is_deleted == False, 

465 Comment.is_hidden == False 

466 ) 

467 if sort == 'top': 

468 q = q.order_by(desc(Comment.score), desc(Comment.created_at)) 

469 else: 

470 q = q.order_by(asc(Comment.created_at)) 

471 return q.all() 

472 

473 @staticmethod 

474 def delete(db: Session, comment: Comment): 

475 comment.is_deleted = True 

476 comment.content = '[deleted]' 

477 db.flush() 

478 

479 

480# ─── Vote Service ─── 

481 

482# Note: with_for_update() is a no-op on SQLite (no SELECT ... FOR UPDATE support). 

483# Use Python-level lock as SQLite workaround for concurrent vote operations. 

484# The with_for_update() calls are kept for PostgreSQL compatibility. 

485_vote_lock = threading.Lock() 

486 

487 

488class VoteService: 

489 

490 @staticmethod 

491 def vote(db: Session, user: User, target_type: str, target_id: str, 

492 value: int) -> dict: 

493 """Cast or change a vote. value: +1 (upvote) or -1 (downvote).""" 

494 with _vote_lock: 

495 return VoteService._vote_inner(db, user, target_type, target_id, value) 

496 

497 @staticmethod 

498 def _vote_inner(db: Session, user: User, target_type: str, target_id: str, 

499 value: int) -> dict: 

500 """Inner vote logic, must be called under _vote_lock.""" 

501 existing = db.query(Vote).filter( 

502 Vote.user_id == user.id, Vote.target_type == target_type, 

503 Vote.target_id == target_id 

504 ).first() 

505 

506 # Get target object (with_for_update prevents concurrent vote race on PostgreSQL; 

507 # no-op on SQLite — Python _vote_lock provides safety there) 

508 if target_type == 'post': 

509 target = db.query(Post).filter(Post.id == target_id).with_for_update().first() 

510 else: 

511 target = db.query(Comment).filter(Comment.id == target_id).with_for_update().first() 

512 if not target: 

513 raise ValueError(f"{target_type} not found") 

514 

515 if existing: 

516 if existing.value == value: 

517 # Remove vote (toggle off) 

518 if value == 1: 

519 target.upvotes -= 1 

520 else: 

521 target.downvotes -= 1 

522 target.score = target.upvotes - target.downvotes 

523 db.delete(existing) 

524 db.flush() 

525 return {'action': 'removed', 'score': target.score} 

526 else: 

527 # Change vote direction 

528 if existing.value == 1: 

529 target.upvotes -= 1 

530 else: 

531 target.downvotes -= 1 

532 existing.value = value 

533 if value == 1: 

534 target.upvotes += 1 

535 else: 

536 target.downvotes += 1 

537 target.score = target.upvotes - target.downvotes 

538 db.flush() 

539 return {'action': 'changed', 'score': target.score} 

540 else: 

541 vote = Vote(id=_uuid(), user_id=user.id, target_type=target_type, 

542 target_id=target_id, value=value) 

543 db.add(vote) 

544 if value == 1: 

545 target.upvotes += 1 

546 else: 

547 target.downvotes += 1 

548 target.score = target.upvotes - target.downvotes 

549 db.flush() 

550 

551 # Notify author on upvote 

552 if value == 1: 

553 author_id = target.author_id 

554 if author_id != user.id: 

555 NotificationService.create( 

556 db, author_id, 'upvote', user.id, target_type, target_id, 

557 f"{user.display_name} upvoted your {target_type}" 

558 ) 

559 # Resonance: award Pulse to author for receiving upvote 

560 try: 

561 from .resonance_engine import ResonanceService 

562 ResonanceService.award_action(db, author_id, 'post_upvote', target_id) 

563 except Exception: 

564 pass 

565 

566 try: 

567 from .gamification_service import GamificationService 

568 GamificationService.check_achievements(db, user.id) 

569 except Exception: 

570 pass 

571 

572 try: 

573 from .onboarding_service import OnboardingService 

574 OnboardingService.auto_advance(db, user.id, 'vote') 

575 except Exception: 

576 pass 

577 

578 return {'action': 'voted', 'score': target.score} 

579 

580 @staticmethod 

581 def remove_vote(db: Session, user: User, target_type: str, target_id: str): 

582 existing = db.query(Vote).filter( 

583 Vote.user_id == user.id, Vote.target_type == target_type, 

584 Vote.target_id == target_id 

585 ).first() 

586 if existing: 

587 if target_type == 'post': 

588 target = db.query(Post).filter(Post.id == target_id).first() 

589 else: 

590 target = db.query(Comment).filter(Comment.id == target_id).first() 

591 if target: 

592 if existing.value == 1: 

593 target.upvotes -= 1 

594 else: 

595 target.downvotes -= 1 

596 target.score = target.upvotes - target.downvotes 

597 db.delete(existing) 

598 db.flush() 

599 

600 @staticmethod 

601 def get_voters(db: Session, target_type: str, target_id: str) -> List[dict]: 

602 """Get list of users who voted (for RN compatibility: like_bypost).""" 

603 votes = db.query(Vote, User).join(User, Vote.user_id == User.id).filter( 

604 Vote.target_type == target_type, Vote.target_id == target_id, 

605 Vote.value == 1 

606 ).all() 

607 return [{'user_id': u.id, 'name': u.display_name, 

608 'profilePic': u.avatar_url} for v, u in votes] 

609 

610 

611# ─── Follow Service ─── 

612 

613class FollowService: 

614 

615 @staticmethod 

616 def follow(db: Session, follower: User, following_id: str) -> bool: 

617 if follower.id == following_id: 

618 raise ValueError("Cannot follow yourself") 

619 target = db.query(User).filter(User.id == following_id).first() 

620 if not target: 

621 raise ValueError("User not found") 

622 

623 existing = db.query(Follow).filter( 

624 Follow.follower_id == follower.id, Follow.following_id == following_id 

625 ).first() 

626 if existing: 

627 return False # already following 

628 

629 follow = Follow(id=_uuid(), follower_id=follower.id, following_id=following_id) 

630 db.add(follow) 

631 db.flush() 

632 NotificationService.create( 

633 db, following_id, 'follow', follower.id, 'profile', follower.id, 

634 f"{follower.display_name} started following you" 

635 ) 

636 return True 

637 

638 @staticmethod 

639 def unfollow(db: Session, follower: User, following_id: str): 

640 existing = db.query(Follow).filter( 

641 Follow.follower_id == follower.id, Follow.following_id == following_id 

642 ).first() 

643 if existing: 

644 db.delete(existing) 

645 db.flush() 

646 

647 @staticmethod 

648 def get_followers(db: Session, user_id: str, limit: int = 50, offset: int = 0 

649 ) -> Tuple[List[User], int]: 

650 q = db.query(User).join(Follow, Follow.follower_id == User.id).filter( 

651 Follow.following_id == user_id) 

652 total = q.count() 

653 users = q.offset(offset).limit(limit).all() 

654 return users, total 

655 

656 @staticmethod 

657 def get_following(db: Session, user_id: str, limit: int = 50, offset: int = 0 

658 ) -> Tuple[List[User], int]: 

659 q = db.query(User).join(Follow, Follow.following_id == User.id).filter( 

660 Follow.follower_id == user_id) 

661 total = q.count() 

662 users = q.offset(offset).limit(limit).all() 

663 return users, total 

664 

665 @staticmethod 

666 def is_following(db: Session, follower_id: str, following_id: str) -> bool: 

667 return db.query(Follow).filter( 

668 Follow.follower_id == follower_id, Follow.following_id == following_id 

669 ).first() is not None 

670 

671 

672# ─── Community Service ─── 

673 

674class CommunityService: 

675 

676 @staticmethod 

677 def create(db: Session, creator: User, name: str, display_name: str = '', 

678 description: str = '', rules: str = '', is_private: bool = False) -> Community: 

679 if db.query(Community).filter(Community.name == name).first(): 

680 raise ValueError(f"Community '{name}' already exists") 

681 

682 community = Community( 

683 id=_uuid(), name=name, display_name=display_name or name, 

684 description=description, rules=rules, creator_id=creator.id, 

685 is_private=is_private, member_count=1, 

686 ) 

687 db.add(community) 

688 db.flush() 

689 # Auto-join creator as admin 

690 membership = CommunityMembership( 

691 id=_uuid(), user_id=creator.id, community_id=community.id, role='admin') 

692 db.add(membership) 

693 db.flush() 

694 return community 

695 

696 @staticmethod 

697 def get_by_name(db: Session, name: str) -> Optional[Community]: 

698 return db.query(Community).filter(Community.name == name).first() 

699 

700 @staticmethod 

701 def list_communities(db: Session, limit: int = 50, offset: int = 0 

702 ) -> Tuple[List[Community], int]: 

703 q = db.query(Community).order_by(desc(Community.member_count)) 

704 total = q.count() 

705 communities = q.offset(offset).limit(limit).all() 

706 return communities, total 

707 

708 @staticmethod 

709 def join(db: Session, user: User, community: Community) -> bool: 

710 """Join a community. Pass-4 P4-5 fix: dual-writes into BOTH 

711 the legacy `community_memberships` table AND the polymorphic 

712 v41 `memberships` table so downstream features that query 

713 `memberships` (e.g., CallService._is_parent_member, the 

714 post-privacy `community` arm) recognise the user as a 

715 member without depending on the v41 fallback branch. 

716 

717 Idempotent: if a row already exists in `community_memberships`, 

718 return False (no change). The polymorphic memberships INSERT 

719 is also idempotent via UNIQUE(parent_kind, parent_id, member_id). 

720 """ 

721 existing = db.query(CommunityMembership).filter( 

722 CommunityMembership.user_id == user.id, 

723 CommunityMembership.community_id == community.id 

724 ).first() 

725 if existing: 

726 return False 

727 membership = CommunityMembership( 

728 id=_uuid(), user_id=user.id, community_id=community.id) 

729 db.add(membership) 

730 community.member_count += 1 

731 # Polymorphic dual-write — best-effort; if it fails (table 

732 # missing on some pre-v41 deploy, integrity constraint), 

733 # legacy row still wins so the join itself succeeds. 

734 try: 

735 from sqlalchemy import text 

736 db.execute(text( 

737 "INSERT INTO memberships " 

738 "(id, parent_kind, parent_id, member_id, agent_kind, role) " 

739 "VALUES (:id, 'community', :pid, :mid, :ak, 'member')"), 

740 {'id': _uuid(), 'pid': community.id, 

741 'mid': user.id, 

742 'ak': 'agent' if user.user_type == 'agent' else 'human'}) 

743 except Exception as e: 

744 logger.debug( 

745 "CommunityService.join polymorphic dual-write skipped: %s", e) 

746 db.flush() 

747 return True 

748 

749 @staticmethod 

750 def leave(db: Session, user: User, community: Community): 

751 """Leave a community. Dual-deletes from BOTH legacy and 

752 polymorphic memberships so the v41 fallback path doesn't 

753 keep stale state. 

754 """ 

755 existing = db.query(CommunityMembership).filter( 

756 CommunityMembership.user_id == user.id, 

757 CommunityMembership.community_id == community.id 

758 ).first() 

759 if existing: 

760 db.delete(existing) 

761 community.member_count = max(0, community.member_count - 1) 

762 try: 

763 from sqlalchemy import text 

764 db.execute(text( 

765 "DELETE FROM memberships " 

766 "WHERE parent_kind = 'community' " 

767 "AND parent_id = :pid AND member_id = :mid"), 

768 {'pid': community.id, 'mid': user.id}) 

769 except Exception as e: 

770 logger.debug( 

771 "CommunityService.leave polymorphic delete skipped: %s", 

772 e) 

773 db.flush() 

774 

775 @staticmethod 

776 def get_members(db: Session, community_id: str, limit: int = 50, offset: int = 0 

777 ) -> Tuple[List[dict], int]: 

778 q = db.query(CommunityMembership, User).join( 

779 User, CommunityMembership.user_id == User.id 

780 ).filter(CommunityMembership.community_id == community_id) 

781 total = q.count() 

782 results = q.offset(offset).limit(limit).all() 

783 return [{'user': u.to_dict(), 'role': m.role} for m, u in results], total 

784 

785 @staticmethod 

786 def get_user_role(db: Session, user_id: str, community_id: str) -> Optional[str]: 

787 m = db.query(CommunityMembership).filter( 

788 CommunityMembership.user_id == user_id, 

789 CommunityMembership.community_id == community_id 

790 ).first() 

791 return m.role if m else None 

792 

793 

794# ─── Notification Service ─── 

795 

796class NotificationService: 

797 

798 @staticmethod 

799 def create(db: Session, user_id: str, type: str, source_user_id: str = None, 

800 target_type: str = None, target_id: str = None, message: str = ''): 

801 notif = Notification( 

802 id=_uuid(), user_id=user_id, type=type, 

803 source_user_id=source_user_id, target_type=target_type, 

804 target_id=target_id, message=message, 

805 ) 

806 db.add(notif) 

807 db.flush() 

808 # Push to SSE + WAMP in real-time AFTER commit (fire-and-forget) 

809 # Defer notification to after_commit to ensure data consistency 

810 notif_dict = notif.to_dict() 

811 _uid = user_id 

812 def _push_after_commit(session): 

813 try: 

814 from .realtime import on_notification 

815 on_notification(_uid, notif_dict) 

816 except Exception: 

817 pass 

818 event.listen(db, 'after_commit', _push_after_commit, once=True) 

819 return notif 

820 

821 @staticmethod 

822 def get_for_user(db: Session, user_id: str, unread_only: bool = False, 

823 limit: int = 50, offset: int = 0) -> Tuple[List[Notification], int]: 

824 q = db.query(Notification).filter(Notification.user_id == user_id) 

825 if unread_only: 

826 q = q.filter(Notification.is_read == False) 

827 total = q.count() 

828 notifs = q.order_by(desc(Notification.created_at)).offset(offset).limit(limit).all() 

829 return notifs, total 

830 

831 @staticmethod 

832 def mark_read(db: Session, notification_ids: List[str], user_id: str): 

833 db.query(Notification).filter( 

834 Notification.id.in_(notification_ids), Notification.user_id == user_id 

835 ).update({Notification.is_read: True}, synchronize_session=False) 

836 db.flush() 

837 

838 @staticmethod 

839 def mark_all_read(db: Session, user_id: str): 

840 db.query(Notification).filter( 

841 Notification.user_id == user_id, Notification.is_read == False 

842 ).update({Notification.is_read: True}, synchronize_session=False) 

843 db.flush() 

844 

845 

846# ─── Report Service ─── 

847 

848class ReportService: 

849 

850 @staticmethod 

851 def create(db: Session, reporter: User, target_type: str, target_id: str, 

852 reason: str, details: str = '') -> Report: 

853 report = Report( 

854 id=_uuid(), reporter_id=reporter.id, target_type=target_type, 

855 target_id=target_id, reason=reason, details=details, 

856 ) 

857 db.add(report) 

858 db.flush() 

859 return report 

860 

861 @staticmethod 

862 def list_reports(db: Session, status: str = None, limit: int = 50, offset: int = 0 

863 ) -> Tuple[List[Report], int]: 

864 q = db.query(Report) 

865 if status: 

866 q = q.filter(Report.status == status) 

867 total = q.count() 

868 reports = q.order_by(desc(Report.created_at)).offset(offset).limit(limit).all() 

869 return reports, total 

870 

871 @staticmethod 

872 def review(db: Session, report: Report, moderator_id: str, status: str): 

873 report.status = status 

874 report.moderator_id = moderator_id 

875 db.flush()