Coverage for integrations / social / services.py: 64.4%
447 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Service Layer
3All business logic for posts, comments, votes, users, communities, follows, notifications.
4"""
5import re
6import uuid
7import logging
8import threading
9from datetime import datetime, timedelta
10from typing import Optional, List, Tuple
12logger = logging.getLogger('hevolve_social')
14from sqlalchemy import desc, asc, func, event
15from sqlalchemy.orm import Session, joinedload
17from .models import (
18 User, Post, Comment, Vote, Follow, Community, CommunityMembership,
19 Notification, Report, TaskRequest, RecipeShare, AgentSkillBadge
20)
21from .auth import hash_password, verify_password, generate_api_token, generate_jwt
24def _uuid():
25 return str(uuid.uuid4())
28# ─── User Service ───
30class UserService:
32 @staticmethod
33 def register(db: Session, username: str, password: str, email: str = None,
34 display_name: str = None, user_type: str = 'human') -> User:
35 if db.query(User).filter(User.username == username).first():
36 raise ValueError("Registration failed - username or email may already be in use")
37 if email:
38 # Basic email format validation
39 if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
40 raise ValueError("Invalid email address format")
41 if db.query(User).filter(User.email == email).first():
42 raise ValueError("Registration failed - username or email may already be in use")
44 user = User(
45 id=_uuid(), username=username, display_name=display_name or username,
46 email=email, password_hash=hash_password(password),
47 user_type=user_type, api_token=generate_api_token(),
48 is_verified=True,
49 )
50 db.add(user)
51 db.flush()
52 return user
54 @staticmethod
55 def register_agent(db: Session, name: str, description: str = '',
56 agent_id: str = None, owner_id: str = None,
57 skip_name_validation: bool = False) -> User:
58 name = name.strip().lower()
60 # Validate 3-word name format (skip for legacy/internal callers)
61 if not skip_name_validation:
62 from .agent_naming import validate_agent_name
63 valid, error = validate_agent_name(name)
64 if not valid:
65 raise ValueError(error)
67 existing = db.query(User).filter(User.username == name).first()
68 if existing:
69 raise ValueError(f"Name '{name}' is already taken globally")
71 user = User(
72 id=_uuid(), username=name, display_name=name,
73 bio=description, user_type='agent', agent_id=agent_id,
74 owner_id=owner_id,
75 api_token=generate_api_token(), is_verified=True,
76 )
77 db.add(user)
78 db.flush()
79 return user
81 @staticmethod
82 def get_owned_agents(db: Session, owner_id: str) -> list:
83 """Get all agents owned by a user."""
84 return db.query(User).filter(
85 User.owner_id == owner_id,
86 User.user_type == 'agent',
87 ).all()
89 # Account lockout: track failed attempts per username
90 _login_attempts = {} # username -> (count, first_attempt_at)
91 _login_lock = threading.Lock()
92 _MAX_ATTEMPTS = 5
93 _LOCKOUT_MINUTES = 15
94 _LOGIN_ATTEMPT_TTL = timedelta(minutes=30)
96 @staticmethod
97 def _cleanup_login_attempts():
98 """Remove expired login attempt entries (called under _login_lock)."""
99 now = datetime.utcnow()
100 expired = [k for k, (_, first_at) in UserService._login_attempts.items()
101 if now - first_at > UserService._LOGIN_ATTEMPT_TTL]
102 for k in expired:
103 del UserService._login_attempts[k]
105 @staticmethod
106 def login(db: Session, username: str, password: str) -> Tuple[User, str]:
107 # Periodically purge stale login attempt records
108 with UserService._login_lock:
109 UserService._cleanup_login_attempts()
110 # Check lockout
111 with UserService._login_lock:
112 entry = UserService._login_attempts.get(username)
113 if entry:
114 count, first_at = entry
115 elapsed = (datetime.utcnow() - first_at).total_seconds()
116 if count >= UserService._MAX_ATTEMPTS and elapsed < UserService._LOCKOUT_MINUTES * 60:
117 remaining = int(UserService._LOCKOUT_MINUTES - elapsed / 60)
118 raise ValueError(f"Account temporarily locked. Try again in {remaining} minutes")
119 if elapsed >= UserService._LOCKOUT_MINUTES * 60:
120 del UserService._login_attempts[username]
122 user = db.query(User).filter(User.username == username).first()
123 # Always run password verification to prevent timing-based user enumeration
124 _dummy_hash = "0" * 32 + ":" + "0" * 64
125 if not verify_password(password, user.password_hash if user else _dummy_hash):
126 # Record failed attempt
127 with UserService._login_lock:
128 entry = UserService._login_attempts.get(username)
129 if entry:
130 UserService._login_attempts[username] = (entry[0] + 1, entry[1])
131 else:
132 UserService._login_attempts[username] = (1, datetime.utcnow())
133 raise ValueError("Invalid username or password")
134 if not user:
135 raise ValueError("Invalid username or password")
136 if user.is_banned:
137 raise ValueError("Account is banned")
138 # Clear lockout on successful login
139 with UserService._login_lock:
140 UserService._login_attempts.pop(username, None)
141 token = generate_jwt(user.id, user.username, getattr(user, 'role', None) or 'flat')
142 user.last_active_at = datetime.utcnow()
143 db.flush()
144 return user, token
146 @staticmethod
147 def set_user_role(db: Session, user, role: str):
148 """Set user role and sync legacy boolean flags."""
149 user.role = role
150 user.is_admin = (role == 'central')
151 user.is_moderator = (role in ('central', 'regional'))
152 db.flush()
154 @staticmethod
155 def get_by_id(db: Session, user_id: str) -> Optional[User]:
156 return db.query(User).filter(User.id == user_id).first()
158 @staticmethod
159 def get_by_username(db: Session, username: str) -> Optional[User]:
160 return db.query(User).filter(User.username == username).first()
162 @staticmethod
163 def list_users(db: Session, user_type: str = None, limit: int = 25,
164 offset: int = 0) -> Tuple[List[User], int]:
165 q = db.query(User).filter(User.is_banned == False)
166 if user_type:
167 q = q.filter(User.user_type == user_type)
168 total = q.count()
169 users = q.order_by(desc(User.karma_score)).offset(offset).limit(limit).all()
170 return users, total
172 @staticmethod
173 def set_handle(db: Session, user: User, handle: str) -> User:
174 """Set a user's unique creator handle (used as suffix in global agent names)."""
175 from .agent_naming import validate_handle, is_handle_available
176 handle = handle.strip().lower()
177 valid, error = validate_handle(handle)
178 if not valid:
179 raise ValueError(error)
180 if not is_handle_available(db, handle):
181 raise ValueError(f"Handle '{handle}' is already taken")
182 user.handle = handle
183 user.updated_at = datetime.utcnow()
184 db.flush()
185 return user
187 @staticmethod
188 def register_agent_local(db: Session, local_name: str, description: str = '',
189 agent_id: str = None, owner: User = None) -> User:
190 """
191 Register an agent using a 2-word local name + owner's handle.
192 The global username becomes: {local_name}.{handle} (e.g. swift.falcon.sathi).
193 """
194 from .agent_naming import (validate_local_name, compose_global_name,
195 check_global_availability)
197 if not owner:
198 raise ValueError("Owner is required")
199 if not owner.handle:
200 raise ValueError("You need to set a handle before creating agents")
202 local_name = local_name.strip().lower()
203 valid, error = validate_local_name(local_name)
204 if not valid:
205 raise ValueError(error)
207 # Check local uniqueness (same owner can't have two agents with same local name)
208 existing_local = db.query(User).filter(
209 User.owner_id == owner.id,
210 User.local_name == local_name,
211 ).first()
212 if existing_local:
213 raise ValueError(f"You already have an agent named '{local_name}'")
215 # Check global availability
216 available, global_name, err = check_global_availability(db, local_name, owner.handle)
217 if not available:
218 raise ValueError(
219 f"'{global_name}' is already taken globally. "
220 f"Please choose a different name for your agent."
221 )
223 user = User(
224 id=_uuid(), username=global_name, display_name=local_name,
225 local_name=local_name, bio=description, user_type='agent',
226 agent_id=agent_id, owner_id=owner.id,
227 api_token=generate_api_token(), is_verified=True,
228 )
229 db.add(user)
230 db.flush()
231 return user
233 @staticmethod
234 def update_profile(db: Session, user: User, display_name: str = None,
235 bio: str = None, avatar_url: str = None,
236 handle: str = None) -> User:
237 if display_name is not None:
238 user.display_name = display_name
239 if bio is not None:
240 user.bio = bio
241 if avatar_url is not None:
242 user.avatar_url = avatar_url
243 if handle is not None and user.user_type == 'human':
244 UserService.set_handle(db, user, handle)
245 user.updated_at = datetime.utcnow()
246 db.flush()
247 return user
250# ─── Post Service ───
252class PostService:
254 @staticmethod
255 def create(db: Session, author: User, title: str, content: str = '',
256 content_type: str = 'text', community_name: str = None,
257 code_language: str = None, media_urls: list = None,
258 link_url: str = None, source_channel: str = None,
259 source_message_id: str = None,
260 intent_category: str = None, hypothesis: str = None,
261 expected_outcome: str = None, is_thought_experiment: bool = False,
262 dynamic_layout: dict = None) -> Post:
263 community_id = None
264 if community_name:
265 community = db.query(Community).filter(Community.name == community_name).first()
266 if community:
267 community_id = community.id
268 community.post_count += 1
270 post = Post(
271 id=_uuid(), author_id=author.id, community_id=community_id,
272 title=title, content=content, content_type=content_type,
273 code_language=code_language, media_urls=media_urls or [],
274 link_url=link_url, source_channel=source_channel,
275 source_message_id=source_message_id,
276 intent_category=intent_category, hypothesis=hypothesis,
277 expected_outcome=expected_outcome,
278 is_thought_experiment=is_thought_experiment or False,
279 dynamic_layout=dynamic_layout,
280 )
281 db.add(post)
282 author.post_count += 1
283 db.flush()
285 # Resonance: award Spark + Signal + XP for creating a post
286 try:
287 from .resonance_engine import ResonanceService
288 ResonanceService.award_action(db, author.id, 'create_post', post.id)
289 except Exception:
290 pass
292 try:
293 from .gamification_service import GamificationService
294 GamificationService.check_achievements(db, author.id)
295 except Exception:
296 pass
298 try:
299 from .onboarding_service import OnboardingService
300 OnboardingService.auto_advance(db, author.id, 'post')
301 except Exception:
302 pass
304 return post
306 @staticmethod
307 def get_by_id(db: Session, post_id: str) -> Optional[Post]:
308 return db.query(Post).options(
309 joinedload(Post.author)
310 ).filter(Post.id == post_id, Post.is_deleted == False).first()
312 @staticmethod
313 def list_posts(db: Session, sort: str = 'new', community_name: str = None,
314 author_id: str = None, limit: int = 25, offset: int = 0,
315 viewer_user=None, apply_privacy: bool = False
316 ) -> Tuple[List[Post], int]:
317 """List posts. When `apply_privacy` is True, the privacy gate
318 from integrations.social.privacy is AND'd into the query.
320 `apply_privacy` is opt-in (default False) so callers in flag-off
321 deploys never load the privacy module or run the EXISTS
322 subqueries. api.py reads g.feature_flags['post_privacy'] and
323 passes it through.
324 """
325 q = db.query(Post).options(joinedload(Post.author)).filter(
326 Post.is_deleted == False, Post.is_hidden == False
327 )
328 if community_name:
329 community = db.query(Community).filter(Community.name == community_name).first()
330 if community:
331 q = q.filter(Post.community_id == community.id)
332 if author_id:
333 q = q.filter(Post.author_id == author_id)
335 if apply_privacy:
336 from .privacy import visible_posts_filter
337 q = q.filter(visible_posts_filter(viewer_user))
339 if sort == 'top':
340 q = q.order_by(desc(Post.score), desc(Post.created_at))
341 elif sort == 'hot':
342 # Hot = score weighted by recency
343 q = q.order_by(desc(Post.score + Post.comment_count), desc(Post.created_at))
344 elif sort == 'discussed':
345 q = q.order_by(desc(Post.comment_count), desc(Post.created_at))
346 else: # 'new'
347 q = q.order_by(desc(Post.created_at))
349 total = q.count()
350 posts = q.offset(offset).limit(limit).all()
351 return posts, total
353 @staticmethod
354 def update(db: Session, post: Post, title: str = None, content: str = None,
355 intent_category: str = None, hypothesis: str = None,
356 expected_outcome: str = None, is_thought_experiment: bool = None,
357 dynamic_layout: dict = None) -> Post:
358 if title is not None:
359 post.title = title
360 if content is not None:
361 post.content = content
362 if intent_category is not None:
363 post.intent_category = intent_category
364 if hypothesis is not None:
365 post.hypothesis = hypothesis
366 if expected_outcome is not None:
367 post.expected_outcome = expected_outcome
368 if is_thought_experiment is not None:
369 post.is_thought_experiment = is_thought_experiment
370 if dynamic_layout is not None:
371 post.dynamic_layout = dynamic_layout
372 post.updated_at = datetime.utcnow()
373 db.flush()
374 return post
376 @staticmethod
377 def delete(db: Session, post: Post):
378 post.is_deleted = True
379 post.author.post_count = max(0, post.author.post_count - 1)
380 if post.community_id:
381 community = db.query(Community).filter_by(id=post.community_id).first()
382 if community:
383 community.post_count = max(0, (community.post_count or 0) - 1)
384 db.flush()
386 @staticmethod
387 def increment_view(db: Session, post: Post):
388 post.view_count += 1
389 db.flush()
392# ─── Comment Service ───
394class CommentService:
396 @staticmethod
397 def create(db: Session, post: Post, author: User, content: str,
398 parent_id: str = None) -> Comment:
399 depth = 0
400 if parent_id:
401 parent = db.query(Comment).filter(Comment.id == parent_id).first()
402 if parent:
403 depth = parent.depth + 1
405 comment = Comment(
406 id=_uuid(), post_id=post.id, author_id=author.id,
407 parent_id=parent_id, content=content, depth=depth,
408 )
409 db.add(comment)
410 post.comment_count += 1
411 author.comment_count += 1
412 db.flush()
414 # Notify post author
415 if post.author_id != author.id:
416 NotificationService.create(
417 db, post.author_id, 'comment', author.id, 'post', post.id,
418 f"{author.display_name} commented on your post"
419 )
420 # Notify parent comment author
421 if parent_id:
422 parent = db.query(Comment).filter(Comment.id == parent_id).first()
423 if parent and parent.author_id != author.id:
424 NotificationService.create(
425 db, parent.author_id, 'reply', author.id, 'comment', parent.id,
426 f"{author.display_name} replied to your comment"
427 )
429 # Resonance: award Spark + Signal + XP for creating a comment
430 try:
431 from .resonance_engine import ResonanceService
432 ResonanceService.award_action(db, author.id, 'create_comment', comment.id)
433 except Exception:
434 pass
436 try:
437 from .gamification_service import GamificationService
438 GamificationService.check_achievements(db, author.id)
439 except Exception:
440 pass
442 try:
443 from .onboarding_service import OnboardingService
444 OnboardingService.auto_advance(db, author.id, 'comment')
445 except Exception:
446 pass
448 try:
449 from .encounter_service import EncounterService
450 # Record encounter between commenter and post author
451 if post and post.author_id and post.author_id != author.id:
452 EncounterService.record_encounter(
453 db, author.id, post.author_id,
454 'post', post.id)
455 except Exception:
456 pass
458 return comment
460 @staticmethod
461 def get_by_post(db: Session, post_id: str, sort: str = 'new'
462 ) -> List[Comment]:
463 q = db.query(Comment).options(joinedload(Comment.author)).filter(
464 Comment.post_id == post_id, Comment.is_deleted == False,
465 Comment.is_hidden == False
466 )
467 if sort == 'top':
468 q = q.order_by(desc(Comment.score), desc(Comment.created_at))
469 else:
470 q = q.order_by(asc(Comment.created_at))
471 return q.all()
473 @staticmethod
474 def delete(db: Session, comment: Comment):
475 comment.is_deleted = True
476 comment.content = '[deleted]'
477 db.flush()
480# ─── Vote Service ───
482# Note: with_for_update() is a no-op on SQLite (no SELECT ... FOR UPDATE support).
483# Use Python-level lock as SQLite workaround for concurrent vote operations.
484# The with_for_update() calls are kept for PostgreSQL compatibility.
485_vote_lock = threading.Lock()
488class VoteService:
490 @staticmethod
491 def vote(db: Session, user: User, target_type: str, target_id: str,
492 value: int) -> dict:
493 """Cast or change a vote. value: +1 (upvote) or -1 (downvote)."""
494 with _vote_lock:
495 return VoteService._vote_inner(db, user, target_type, target_id, value)
497 @staticmethod
498 def _vote_inner(db: Session, user: User, target_type: str, target_id: str,
499 value: int) -> dict:
500 """Inner vote logic, must be called under _vote_lock."""
501 existing = db.query(Vote).filter(
502 Vote.user_id == user.id, Vote.target_type == target_type,
503 Vote.target_id == target_id
504 ).first()
506 # Get target object (with_for_update prevents concurrent vote race on PostgreSQL;
507 # no-op on SQLite — Python _vote_lock provides safety there)
508 if target_type == 'post':
509 target = db.query(Post).filter(Post.id == target_id).with_for_update().first()
510 else:
511 target = db.query(Comment).filter(Comment.id == target_id).with_for_update().first()
512 if not target:
513 raise ValueError(f"{target_type} not found")
515 if existing:
516 if existing.value == value:
517 # Remove vote (toggle off)
518 if value == 1:
519 target.upvotes -= 1
520 else:
521 target.downvotes -= 1
522 target.score = target.upvotes - target.downvotes
523 db.delete(existing)
524 db.flush()
525 return {'action': 'removed', 'score': target.score}
526 else:
527 # Change vote direction
528 if existing.value == 1:
529 target.upvotes -= 1
530 else:
531 target.downvotes -= 1
532 existing.value = value
533 if value == 1:
534 target.upvotes += 1
535 else:
536 target.downvotes += 1
537 target.score = target.upvotes - target.downvotes
538 db.flush()
539 return {'action': 'changed', 'score': target.score}
540 else:
541 vote = Vote(id=_uuid(), user_id=user.id, target_type=target_type,
542 target_id=target_id, value=value)
543 db.add(vote)
544 if value == 1:
545 target.upvotes += 1
546 else:
547 target.downvotes += 1
548 target.score = target.upvotes - target.downvotes
549 db.flush()
551 # Notify author on upvote
552 if value == 1:
553 author_id = target.author_id
554 if author_id != user.id:
555 NotificationService.create(
556 db, author_id, 'upvote', user.id, target_type, target_id,
557 f"{user.display_name} upvoted your {target_type}"
558 )
559 # Resonance: award Pulse to author for receiving upvote
560 try:
561 from .resonance_engine import ResonanceService
562 ResonanceService.award_action(db, author_id, 'post_upvote', target_id)
563 except Exception:
564 pass
566 try:
567 from .gamification_service import GamificationService
568 GamificationService.check_achievements(db, user.id)
569 except Exception:
570 pass
572 try:
573 from .onboarding_service import OnboardingService
574 OnboardingService.auto_advance(db, user.id, 'vote')
575 except Exception:
576 pass
578 return {'action': 'voted', 'score': target.score}
580 @staticmethod
581 def remove_vote(db: Session, user: User, target_type: str, target_id: str):
582 existing = db.query(Vote).filter(
583 Vote.user_id == user.id, Vote.target_type == target_type,
584 Vote.target_id == target_id
585 ).first()
586 if existing:
587 if target_type == 'post':
588 target = db.query(Post).filter(Post.id == target_id).first()
589 else:
590 target = db.query(Comment).filter(Comment.id == target_id).first()
591 if target:
592 if existing.value == 1:
593 target.upvotes -= 1
594 else:
595 target.downvotes -= 1
596 target.score = target.upvotes - target.downvotes
597 db.delete(existing)
598 db.flush()
600 @staticmethod
601 def get_voters(db: Session, target_type: str, target_id: str) -> List[dict]:
602 """Get list of users who voted (for RN compatibility: like_bypost)."""
603 votes = db.query(Vote, User).join(User, Vote.user_id == User.id).filter(
604 Vote.target_type == target_type, Vote.target_id == target_id,
605 Vote.value == 1
606 ).all()
607 return [{'user_id': u.id, 'name': u.display_name,
608 'profilePic': u.avatar_url} for v, u in votes]
611# ─── Follow Service ───
613class FollowService:
615 @staticmethod
616 def follow(db: Session, follower: User, following_id: str) -> bool:
617 if follower.id == following_id:
618 raise ValueError("Cannot follow yourself")
619 target = db.query(User).filter(User.id == following_id).first()
620 if not target:
621 raise ValueError("User not found")
623 existing = db.query(Follow).filter(
624 Follow.follower_id == follower.id, Follow.following_id == following_id
625 ).first()
626 if existing:
627 return False # already following
629 follow = Follow(id=_uuid(), follower_id=follower.id, following_id=following_id)
630 db.add(follow)
631 db.flush()
632 NotificationService.create(
633 db, following_id, 'follow', follower.id, 'profile', follower.id,
634 f"{follower.display_name} started following you"
635 )
636 return True
638 @staticmethod
639 def unfollow(db: Session, follower: User, following_id: str):
640 existing = db.query(Follow).filter(
641 Follow.follower_id == follower.id, Follow.following_id == following_id
642 ).first()
643 if existing:
644 db.delete(existing)
645 db.flush()
647 @staticmethod
648 def get_followers(db: Session, user_id: str, limit: int = 50, offset: int = 0
649 ) -> Tuple[List[User], int]:
650 q = db.query(User).join(Follow, Follow.follower_id == User.id).filter(
651 Follow.following_id == user_id)
652 total = q.count()
653 users = q.offset(offset).limit(limit).all()
654 return users, total
656 @staticmethod
657 def get_following(db: Session, user_id: str, limit: int = 50, offset: int = 0
658 ) -> Tuple[List[User], int]:
659 q = db.query(User).join(Follow, Follow.following_id == User.id).filter(
660 Follow.follower_id == user_id)
661 total = q.count()
662 users = q.offset(offset).limit(limit).all()
663 return users, total
665 @staticmethod
666 def is_following(db: Session, follower_id: str, following_id: str) -> bool:
667 return db.query(Follow).filter(
668 Follow.follower_id == follower_id, Follow.following_id == following_id
669 ).first() is not None
672# ─── Community Service ───
674class CommunityService:
676 @staticmethod
677 def create(db: Session, creator: User, name: str, display_name: str = '',
678 description: str = '', rules: str = '', is_private: bool = False) -> Community:
679 if db.query(Community).filter(Community.name == name).first():
680 raise ValueError(f"Community '{name}' already exists")
682 community = Community(
683 id=_uuid(), name=name, display_name=display_name or name,
684 description=description, rules=rules, creator_id=creator.id,
685 is_private=is_private, member_count=1,
686 )
687 db.add(community)
688 db.flush()
689 # Auto-join creator as admin
690 membership = CommunityMembership(
691 id=_uuid(), user_id=creator.id, community_id=community.id, role='admin')
692 db.add(membership)
693 db.flush()
694 return community
696 @staticmethod
697 def get_by_name(db: Session, name: str) -> Optional[Community]:
698 return db.query(Community).filter(Community.name == name).first()
700 @staticmethod
701 def list_communities(db: Session, limit: int = 50, offset: int = 0
702 ) -> Tuple[List[Community], int]:
703 q = db.query(Community).order_by(desc(Community.member_count))
704 total = q.count()
705 communities = q.offset(offset).limit(limit).all()
706 return communities, total
708 @staticmethod
709 def join(db: Session, user: User, community: Community) -> bool:
710 """Join a community. Pass-4 P4-5 fix: dual-writes into BOTH
711 the legacy `community_memberships` table AND the polymorphic
712 v41 `memberships` table so downstream features that query
713 `memberships` (e.g., CallService._is_parent_member, the
714 post-privacy `community` arm) recognise the user as a
715 member without depending on the v41 fallback branch.
717 Idempotent: if a row already exists in `community_memberships`,
718 return False (no change). The polymorphic memberships INSERT
719 is also idempotent via UNIQUE(parent_kind, parent_id, member_id).
720 """
721 existing = db.query(CommunityMembership).filter(
722 CommunityMembership.user_id == user.id,
723 CommunityMembership.community_id == community.id
724 ).first()
725 if existing:
726 return False
727 membership = CommunityMembership(
728 id=_uuid(), user_id=user.id, community_id=community.id)
729 db.add(membership)
730 community.member_count += 1
731 # Polymorphic dual-write — best-effort; if it fails (table
732 # missing on some pre-v41 deploy, integrity constraint),
733 # legacy row still wins so the join itself succeeds.
734 try:
735 from sqlalchemy import text
736 db.execute(text(
737 "INSERT INTO memberships "
738 "(id, parent_kind, parent_id, member_id, agent_kind, role) "
739 "VALUES (:id, 'community', :pid, :mid, :ak, 'member')"),
740 {'id': _uuid(), 'pid': community.id,
741 'mid': user.id,
742 'ak': 'agent' if user.user_type == 'agent' else 'human'})
743 except Exception as e:
744 logger.debug(
745 "CommunityService.join polymorphic dual-write skipped: %s", e)
746 db.flush()
747 return True
749 @staticmethod
750 def leave(db: Session, user: User, community: Community):
751 """Leave a community. Dual-deletes from BOTH legacy and
752 polymorphic memberships so the v41 fallback path doesn't
753 keep stale state.
754 """
755 existing = db.query(CommunityMembership).filter(
756 CommunityMembership.user_id == user.id,
757 CommunityMembership.community_id == community.id
758 ).first()
759 if existing:
760 db.delete(existing)
761 community.member_count = max(0, community.member_count - 1)
762 try:
763 from sqlalchemy import text
764 db.execute(text(
765 "DELETE FROM memberships "
766 "WHERE parent_kind = 'community' "
767 "AND parent_id = :pid AND member_id = :mid"),
768 {'pid': community.id, 'mid': user.id})
769 except Exception as e:
770 logger.debug(
771 "CommunityService.leave polymorphic delete skipped: %s",
772 e)
773 db.flush()
775 @staticmethod
776 def get_members(db: Session, community_id: str, limit: int = 50, offset: int = 0
777 ) -> Tuple[List[dict], int]:
778 q = db.query(CommunityMembership, User).join(
779 User, CommunityMembership.user_id == User.id
780 ).filter(CommunityMembership.community_id == community_id)
781 total = q.count()
782 results = q.offset(offset).limit(limit).all()
783 return [{'user': u.to_dict(), 'role': m.role} for m, u in results], total
785 @staticmethod
786 def get_user_role(db: Session, user_id: str, community_id: str) -> Optional[str]:
787 m = db.query(CommunityMembership).filter(
788 CommunityMembership.user_id == user_id,
789 CommunityMembership.community_id == community_id
790 ).first()
791 return m.role if m else None
794# ─── Notification Service ───
796class NotificationService:
798 @staticmethod
799 def create(db: Session, user_id: str, type: str, source_user_id: str = None,
800 target_type: str = None, target_id: str = None, message: str = ''):
801 notif = Notification(
802 id=_uuid(), user_id=user_id, type=type,
803 source_user_id=source_user_id, target_type=target_type,
804 target_id=target_id, message=message,
805 )
806 db.add(notif)
807 db.flush()
808 # Push to SSE + WAMP in real-time AFTER commit (fire-and-forget)
809 # Defer notification to after_commit to ensure data consistency
810 notif_dict = notif.to_dict()
811 _uid = user_id
812 def _push_after_commit(session):
813 try:
814 from .realtime import on_notification
815 on_notification(_uid, notif_dict)
816 except Exception:
817 pass
818 event.listen(db, 'after_commit', _push_after_commit, once=True)
819 return notif
821 @staticmethod
822 def get_for_user(db: Session, user_id: str, unread_only: bool = False,
823 limit: int = 50, offset: int = 0) -> Tuple[List[Notification], int]:
824 q = db.query(Notification).filter(Notification.user_id == user_id)
825 if unread_only:
826 q = q.filter(Notification.is_read == False)
827 total = q.count()
828 notifs = q.order_by(desc(Notification.created_at)).offset(offset).limit(limit).all()
829 return notifs, total
831 @staticmethod
832 def mark_read(db: Session, notification_ids: List[str], user_id: str):
833 db.query(Notification).filter(
834 Notification.id.in_(notification_ids), Notification.user_id == user_id
835 ).update({Notification.is_read: True}, synchronize_session=False)
836 db.flush()
838 @staticmethod
839 def mark_all_read(db: Session, user_id: str):
840 db.query(Notification).filter(
841 Notification.user_id == user_id, Notification.is_read == False
842 ).update({Notification.is_read: True}, synchronize_session=False)
843 db.flush()
846# ─── Report Service ───
848class ReportService:
850 @staticmethod
851 def create(db: Session, reporter: User, target_type: str, target_id: str,
852 reason: str, details: str = '') -> Report:
853 report = Report(
854 id=_uuid(), reporter_id=reporter.id, target_type=target_type,
855 target_id=target_id, reason=reason, details=details,
856 )
857 db.add(report)
858 db.flush()
859 return report
861 @staticmethod
862 def list_reports(db: Session, status: str = None, limit: int = 50, offset: int = 0
863 ) -> Tuple[List[Report], int]:
864 q = db.query(Report)
865 if status:
866 q = q.filter(Report.status == status)
867 total = q.count()
868 reports = q.order_by(desc(Report.created_at)).offset(offset).limit(limit).all()
869 return reports, total
871 @staticmethod
872 def review(db: Session, report: Report, moderator_id: str, status: str):
873 report.status = status
874 report.moderator_id = moderator_id
875 db.flush()