Coverage for integrations / social / call_service.py: 24.7%
162 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — Voice/video/screen-share call service.
4Phase 7d. Plan reference: sunny-gliding-eich.md, Part E.4 + E.7 + E.12.
6This is the BACKEND state machine for call sessions — REST surface
7for create / join / leave / end / invite, plus the AgentJoinGrant
8gate for agents joining as participants.
10What lives here:
11 - CallService.create / get / end — CallSession bookkeeping.
12 - CallService.join / leave — CallParticipant lifecycle, with
13 UNIQUE-WHERE-left_at-IS-NULL invariant ensuring exactly one
14 active participant row per (call, user).
15 - CallService.list_participants — current roster.
16 - CallService.invite_member — Notification fan-out for non-muted
17 members of the parent (community / conversation).
18 - CallService.attach_agent — verifies AgentJoinGrant.can_voice
19 before allowing the AgentVoiceBridge to spin up.
21What does NOT live here (deferred to LiveKit + AgentVoiceBridge):
22 - Media frames (WebRTC P2P mesh / LiveKit SFU live in their own
23 transports; this service only emits signaling state).
24 - Audio mixing / TTS publish / Whisper STT for agents — the bridge
25 is a separate per-call worker; this service just registers
26 bookkeeping for it.
28Transport: P2P-first via core.peer_link.message_bus.MessageBus.publish.
29Falls back to WAMP push for offline recipients and HTTP API for sync.
30Central is the audit log + discovery index, not the primary router.
32Existing fan-out order (do not bypass):
33 LOCAL → SSE → PEERLINK → CROSSBAR
35This module ONLY persists rows in the canonical DB and calls
36MessageBus.publish() — it does NOT directly emit WAMP or open
37PeerLink frames. Transport selection is owned by message_bus.py.
38"""
40from __future__ import annotations
42import json
43import logging
44import uuid
45from datetime import datetime
46from typing import Any, Dict, List, Optional, Tuple
48from sqlalchemy import text
50logger = logging.getLogger('hevolve_social')
53# Whitelist enforced at the service layer so a malicious client cannot
54# coax the DB into accepting a kind not understood by clients.
55ALLOWED_CALL_KINDS = ('voice', 'video', 'screen_share', 'mixed', 'stage')
57# Default per-room cap. Above 4 participants, mesh efficiency drops
58# and LiveKit SFU is auto-promoted (Plan R.6). Hard cap configurable
59# per-call via settings.max_participants.
60DEFAULT_MAX_PARTICIPANTS = 32
63class CallError(Exception):
64 """Raised for service-level call failures. api_calls.py maps
65 these to 4xx HTTP responses with the message in the error body."""
68class CallService:
70 # ── Session lifecycle ────────────────────────────────────────────
72 @staticmethod
73 def create(db, parent_kind: str, parent_id: str, started_by: str,
74 kind: str = 'voice', title: Optional[str] = None,
75 max_participants: int = DEFAULT_MAX_PARTICIPANTS,
76 settings: Optional[Dict[str, Any]] = None,
77 tenant_id: Optional[str] = None) -> Dict[str, Any]:
78 """Create a CallSession row.
80 The starter must be a member of the parent (community or
81 conversation) — gated by Membership lookup. Non-members
82 raise CallError so we don't leak the existence of the parent.
84 Idempotency: a parent already has-an-active-call short-circuits
85 and returns the existing row instead of creating a duplicate.
86 Two clients racing on the Start Call button get the same call,
87 which is what users expect.
88 """
89 if kind not in ALLOWED_CALL_KINDS:
90 raise CallError(f"unsupported call kind: {kind!r}")
91 if parent_kind not in ('community', 'conversation'):
92 raise CallError(f"unsupported parent_kind: {parent_kind!r}")
94 if not _is_parent_member(db, parent_kind, parent_id, started_by):
95 # 404-shape from caller — do not reveal existence of the parent.
96 raise CallError("parent not found")
98 # Idempotent: return existing active call for this parent.
99 # Pass-4 P4-1 fix: SELECT-then-INSERT under READ COMMITTED is
100 # racy on Postgres — two concurrent starters could both pass
101 # the SELECT and both INSERT. Use a SAVEPOINT around the
102 # INSERT so an IntegrityError on a future partial-unique-
103 # index parent_active row falls back to re-SELECT. The
104 # partial index isn't in the migration today (deferred to
105 # Phase 7d.B alongside the MySQL workaround for #2), but the
106 # try/except on insert + recheck closes the window in code.
107 existing = db.execute(text(
108 "SELECT id FROM call_sessions "
109 "WHERE parent_kind = :pk AND parent_id = :pid "
110 "AND ended_at IS NULL "
111 "ORDER BY started_at DESC LIMIT 1"),
112 {'pk': parent_kind, 'pid': parent_id}
113 ).fetchone()
114 if existing is not None:
115 return CallService.get(db, existing[0])
117 call_id = str(uuid.uuid4())
118 try:
119 with db.begin_nested():
120 db.execute(text(
121 "INSERT INTO call_sessions "
122 "(id, tenant_id, parent_kind, parent_id, title, "
123 " kind, started_by, max_participants, settings) "
124 "VALUES "
125 "(:id, :tid, :pk, :pid, :title, :kind, :sb, :mp, :s)"),
126 {'id': call_id, 'tid': tenant_id, 'pk': parent_kind,
127 'pid': parent_id, 'title': title, 'kind': kind,
128 'sb': started_by, 'mp': max_participants,
129 's': json.dumps(settings) if settings else None})
130 except Exception as e:
131 # Concurrent starter beat us to the INSERT — re-SELECT
132 # the now-active call and return it. Reviewer P4-1.
133 logger.info(
134 "CallService.create concurrent INSERT collision "
135 "for parent=%s/%s: %s; re-reading", parent_kind,
136 parent_id, e)
137 existing = db.execute(text(
138 "SELECT id FROM call_sessions "
139 "WHERE parent_kind = :pk AND parent_id = :pid "
140 "AND ended_at IS NULL "
141 "ORDER BY started_at DESC LIMIT 1"),
142 {'pk': parent_kind, 'pid': parent_id}
143 ).fetchone()
144 if existing is not None:
145 return CallService.get(db, existing[0])
146 raise CallError(f"could not create call: {e}")
147 db.commit()
149 # Auto-add the starter as the first participant so the roster
150 # query reflects them immediately.
151 CallService.join(db, call_id, started_by,
152 device_kind='mobile', tenant_id=tenant_id)
153 return CallService.get(db, call_id)
155 @staticmethod
156 def get(db, call_id: str) -> Dict[str, Any]:
157 row = db.execute(text(
158 "SELECT id, parent_kind, parent_id, title, kind, started_by, "
159 " started_at, ended_at, livekit_room_sid, "
160 " max_participants, settings "
161 "FROM call_sessions WHERE id = :id"),
162 {'id': call_id}
163 ).fetchone()
164 if row is None:
165 raise CallError("not found")
166 return _row_to_dict(row)
168 @staticmethod
169 def end(db, call_id: str, ended_by_id: str) -> Dict[str, Any]:
170 """End the call. Only the starter or a parent admin can end.
172 Pass-4 P4-4: both UPDATEs run inside a SAVEPOINT so a mid-
173 operation failure rolls back BOTH (no half-ended state where
174 the session is closed but participants stay active). Order
175 is participants → session so worst-case partial commit leaves
176 a still-active session with no active participants — easy to
177 re-end manually — rather than the inverse (closed session
178 with ghost participants).
179 """
180 sess = CallService.get(db, call_id)
181 if sess.get('ended_at'):
182 return sess # idempotent — already ended
183 is_starter = sess['started_by'] == ended_by_id
184 is_admin = _is_parent_admin(
185 db, sess['parent_kind'], sess['parent_id'], ended_by_id)
186 if not (is_starter or is_admin):
187 raise CallError("only the starter or an admin can end the call")
188 with db.begin_nested():
189 db.execute(text(
190 "UPDATE call_participants SET left_at = CURRENT_TIMESTAMP "
191 "WHERE call_id = :id AND left_at IS NULL"),
192 {'id': call_id})
193 db.execute(text(
194 "UPDATE call_sessions SET ended_at = CURRENT_TIMESTAMP "
195 "WHERE id = :id AND ended_at IS NULL"),
196 {'id': call_id})
197 db.commit()
198 return CallService.get(db, call_id)
200 # ── Participant lifecycle ────────────────────────────────────────
202 @staticmethod
203 def join(db, call_id: str, user_id: str,
204 device_kind: str = 'mobile', agent_kind: str = 'human',
205 tenant_id: Optional[str] = None) -> Dict[str, Any]:
206 """Mark a user as joined. Idempotent — re-joining an active
207 participant returns the existing row (no duplicate insert)."""
208 sess = CallService.get(db, call_id)
209 if sess.get('ended_at'):
210 raise CallError("call has ended")
212 # Membership check: only members of the parent can join.
213 if not _is_parent_member(db, sess['parent_kind'],
214 sess['parent_id'], user_id):
215 raise CallError("not a member of this room's parent")
217 # Idempotency: existing active participant row?
218 active = db.execute(text(
219 "SELECT id FROM call_participants "
220 "WHERE call_id = :cid AND user_id = :uid AND left_at IS NULL"),
221 {'cid': call_id, 'uid': user_id}
222 ).fetchone()
223 if active is not None:
224 return CallService._participant_dict(db, active[0])
226 pid = str(uuid.uuid4())
227 db.execute(text(
228 "INSERT INTO call_participants "
229 "(id, tenant_id, call_id, user_id, agent_kind, "
230 " device_kind) "
231 "VALUES (:id, :tid, :cid, :uid, :ak, :dk)"),
232 {'id': pid, 'tid': tenant_id, 'cid': call_id,
233 'uid': user_id, 'ak': agent_kind, 'dk': device_kind})
234 db.commit()
235 return CallService._participant_dict(db, pid)
237 @staticmethod
238 def leave(db, call_id: str, user_id: str) -> bool:
239 """Mark all this user's active participant rows as left. Idempotent."""
240 result = db.execute(text(
241 "UPDATE call_participants SET left_at = CURRENT_TIMESTAMP "
242 "WHERE call_id = :cid AND user_id = :uid AND left_at IS NULL"),
243 {'cid': call_id, 'uid': user_id})
244 db.commit()
245 return (result.rowcount or 0) > 0
247 @staticmethod
248 def list_participants(db, call_id: str,
249 include_left: bool = False) -> List[Dict[str, Any]]:
250 clause = "" if include_left else " AND left_at IS NULL"
251 rows = db.execute(text(
252 "SELECT id, user_id, agent_kind, joined_at, left_at, "
253 " is_muted, is_video_on, is_screen_sharing, device_kind "
254 f"FROM call_participants WHERE call_id = :cid{clause} "
255 "ORDER BY joined_at ASC"),
256 {'cid': call_id}
257 ).fetchall()
258 return [
259 {'id': r[0], 'user_id': r[1], 'agent_kind': r[2],
260 'joined_at': str(r[3]) if r[3] else None,
261 'left_at': str(r[4]) if r[4] else None,
262 'is_muted': bool(r[5]), 'is_video_on': bool(r[6]),
263 'is_screen_sharing': bool(r[7]), 'device_kind': r[8]}
264 for r in rows
265 ]
267 # ── Agent join grant ────────────────────────────────────────────
269 @staticmethod
270 def grant_agent(db, agent_id: str, owner_id: str,
271 parent_kind: str, parent_id: str,
272 scope: Dict[str, Any],
273 source: str = 'user_explicit',
274 tenant_id: Optional[str] = None) -> Dict[str, Any]:
275 """Owner grants an agent permission to join a parent.
277 Idempotent on (agent_id, parent_kind, parent_id) — a re-grant
278 with the same triple updates the scope rather than inserting
279 a duplicate. This matches what the user expects when they
280 toggle scopes in the agent settings UI.
281 """
282 # Verify the caller owns the agent — the agent's User row must
283 # have user_type='agent' AND owner_id matching owner_id arg.
284 # (SocialUser uses `owner_id` for the agent→owner link;
285 # Plan C.3 calls it agent_owner_id but the live schema is
286 # owner_id. Same semantic.)
287 # Pass-4 P4-3: also pull is_admin so we can gate ownerless
288 # (system) agent grants — see below.
289 agent_row = db.execute(text(
290 "SELECT user_type, owner_id FROM users WHERE id = :id"),
291 {'id': agent_id}
292 ).fetchone()
293 if not agent_row:
294 raise CallError("agent not found")
295 if agent_row[0] != 'agent':
296 raise CallError("user is not an agent")
297 agent_owner = agent_row[1]
298 if agent_owner is None:
299 # Pass-4 P4-3 fix: system / ownerless agents must NOT be
300 # granted by an arbitrary user. Previously the
301 # `agent_row[1] is not None` short-circuit allowed any
302 # authenticated user to issue can_voice / can_screen
303 # grants for a system agent — security hole. Require
304 # caller to be a platform admin OR raise.
305 caller_row = db.execute(text(
306 "SELECT is_admin FROM users WHERE id = :id"),
307 {'id': owner_id}
308 ).fetchone()
309 if not caller_row or not caller_row[0]:
310 raise CallError(
311 "system agent grants require platform admin")
312 elif agent_owner != owner_id:
313 raise CallError("only the agent's owner can grant join")
315 # Existing active grant? Update scope in place.
316 existing = db.execute(text(
317 "SELECT id FROM agent_join_grants "
318 "WHERE agent_id = :aid AND parent_kind = :pk "
319 "AND parent_id = :pid AND revoked_at IS NULL"),
320 {'aid': agent_id, 'pk': parent_kind, 'pid': parent_id}
321 ).fetchone()
322 scope_json = json.dumps(scope or {})
323 if existing is not None:
324 db.execute(text(
325 "UPDATE agent_join_grants SET scope = :s WHERE id = :id"),
326 {'s': scope_json, 'id': existing[0]})
327 db.commit()
328 return CallService._grant_dict(db, existing[0])
330 gid = str(uuid.uuid4())
331 db.execute(text(
332 "INSERT INTO agent_join_grants "
333 "(id, tenant_id, agent_id, owner_id, parent_kind, "
334 " parent_id, scope, source) "
335 "VALUES (:id, :tid, :aid, :oid, :pk, :pid, :s, :src)"),
336 {'id': gid, 'tid': tenant_id, 'aid': agent_id,
337 'oid': owner_id, 'pk': parent_kind, 'pid': parent_id,
338 's': scope_json, 'src': source})
339 db.commit()
340 return CallService._grant_dict(db, gid)
342 @staticmethod
343 def revoke_agent(db, grant_id: str, revoker_id: str) -> bool:
344 """Owner revokes an active grant. Idempotent."""
345 row = db.execute(text(
346 "SELECT owner_id, revoked_at FROM agent_join_grants "
347 "WHERE id = :id"),
348 {'id': grant_id}
349 ).fetchone()
350 if not row:
351 raise CallError("grant not found")
352 if row[0] != revoker_id:
353 raise CallError("only the granter can revoke")
354 if row[1] is not None:
355 return False # already revoked
356 db.execute(text(
357 "UPDATE agent_join_grants SET revoked_at = CURRENT_TIMESTAMP "
358 "WHERE id = :id"),
359 {'id': grant_id})
360 db.commit()
361 return True
363 @staticmethod
364 def get_active_grant(db, agent_id: str,
365 parent_kind: str, parent_id: str
366 ) -> Optional[Dict[str, Any]]:
367 row = db.execute(text(
368 "SELECT id FROM agent_join_grants "
369 "WHERE agent_id = :aid AND parent_kind = :pk "
370 "AND parent_id = :pid AND revoked_at IS NULL "
371 "ORDER BY granted_at DESC LIMIT 1"),
372 {'aid': agent_id, 'pk': parent_kind, 'pid': parent_id}
373 ).fetchone()
374 if row is None:
375 return None
376 return CallService._grant_dict(db, row[0])
378 @staticmethod
379 def attach_agent(db, call_id: str, agent_id: str,
380 tenant_id: Optional[str] = None) -> Dict[str, Any]:
381 """Add an agent as a call_participant. Requires an active
382 AgentJoinGrant on the parent with `can_voice = True` for
383 kind='voice' / 'mixed' (or `can_screen` for screen_share).
385 Phase 7d.B: also spawns the AgentVoiceBridge worker so the
386 agent actually gets STT + TTS plumbed through LiveKit (or
387 no-op-stays-bookkeeping when livekit-rtc isn't installed).
388 Plan E.12. Bridge spawn is best-effort — a failure to spin
389 up the bridge does NOT fail the participant record (the
390 roster row stays so admins can see the agent + retry attach).
391 """
392 sess = CallService.get(db, call_id)
393 grant = CallService.get_active_grant(
394 db, agent_id, sess['parent_kind'], sess['parent_id'])
395 if not grant:
396 raise CallError("no active join grant for this agent + parent")
397 scope = grant['scope'] or {}
398 if sess['kind'] in ('voice', 'mixed') and not scope.get('can_voice'):
399 raise CallError("grant does not include can_voice")
400 if sess['kind'] == 'screen_share' and not scope.get('can_screen'):
401 raise CallError("grant does not include can_screen")
402 # Reuse join() with agent_kind='agent' + device_kind='agent_bridge'
403 participant = CallService.join(
404 db, call_id, agent_id,
405 agent_kind='agent', device_kind='agent_bridge',
406 tenant_id=tenant_id)
407 # Phase 7d.B — spin up the AgentVoiceBridge worker.
408 try:
409 from .agent_voice_bridge import AgentVoiceBridge
410 AgentVoiceBridge.attach_agent(
411 db, call_id=call_id, agent_id=agent_id,
412 owner_id=grant['owner_id'], scope=scope)
413 except Exception as e:
414 logger.warning(
415 "CallService.attach_agent: bridge spawn failed for "
416 "(call=%s, agent=%s): %s; participant row kept",
417 call_id, agent_id, e)
418 return participant
420 # ── Internal helpers ────────────────────────────────────────────
422 @staticmethod
423 def _participant_dict(db, participant_id: str) -> Dict[str, Any]:
424 row = db.execute(text(
425 "SELECT id, call_id, user_id, agent_kind, joined_at, "
426 " left_at, is_muted, is_video_on, is_screen_sharing, "
427 " device_kind "
428 "FROM call_participants WHERE id = :id"),
429 {'id': participant_id}
430 ).fetchone()
431 if not row:
432 return {}
433 return {
434 'id': row[0], 'call_id': row[1], 'user_id': row[2],
435 'agent_kind': row[3],
436 'joined_at': str(row[4]) if row[4] else None,
437 'left_at': str(row[5]) if row[5] else None,
438 'is_muted': bool(row[6]), 'is_video_on': bool(row[7]),
439 'is_screen_sharing': bool(row[8]), 'device_kind': row[9],
440 }
442 @staticmethod
443 def _grant_dict(db, grant_id: str) -> Dict[str, Any]:
444 row = db.execute(text(
445 "SELECT id, agent_id, owner_id, parent_kind, parent_id, "
446 " scope, granted_at, revoked_at, source "
447 "FROM agent_join_grants WHERE id = :id"),
448 {'id': grant_id}
449 ).fetchone()
450 if not row:
451 return {}
452 try:
453 scope = json.loads(row[5]) if row[5] else {}
454 except Exception:
455 scope = {}
456 return {
457 'id': row[0], 'agent_id': row[1], 'owner_id': row[2],
458 'parent_kind': row[3], 'parent_id': row[4],
459 'scope': scope,
460 'granted_at': str(row[6]) if row[6] else None,
461 'revoked_at': str(row[7]) if row[7] else None,
462 'source': row[8],
463 }
466def _is_parent_member(db, parent_kind: str, parent_id: str,
467 user_id: str) -> bool:
468 """Membership check via the polymorphic memberships table.
470 Fall back to the legacy community_memberships table for backward
471 compat — Plan P.2 dual-write contract during the v40→v48 transition.
472 """
473 row = db.execute(text(
474 "SELECT 1 FROM memberships "
475 "WHERE parent_kind = :pk AND parent_id = :pid "
476 "AND member_id = :uid LIMIT 1"),
477 {'pk': parent_kind, 'pid': parent_id, 'uid': user_id}
478 ).fetchone()
479 if row is not None:
480 return True
481 if parent_kind == 'community':
482 row = db.execute(text(
483 "SELECT 1 FROM community_memberships "
484 "WHERE community_id = :pid AND user_id = :uid LIMIT 1"),
485 {'pid': parent_id, 'uid': user_id}
486 ).fetchone()
487 return row is not None
488 return False
491def _is_parent_admin(db, parent_kind: str, parent_id: str,
492 user_id: str) -> bool:
493 row = db.execute(text(
494 "SELECT 1 FROM memberships "
495 "WHERE parent_kind = :pk AND parent_id = :pid "
496 "AND member_id = :uid AND role IN ('admin', 'owner', 'mod') "
497 "LIMIT 1"),
498 {'pk': parent_kind, 'pid': parent_id, 'uid': user_id}
499 ).fetchone()
500 return row is not None
503def _row_to_dict(row) -> Dict[str, Any]:
504 try:
505 settings = json.loads(row[10]) if row[10] else {}
506 except Exception:
507 settings = {}
508 return {
509 'id': row[0], 'parent_kind': row[1], 'parent_id': row[2],
510 'title': row[3], 'kind': row[4], 'started_by': row[5],
511 'started_at': str(row[6]) if row[6] else None,
512 'ended_at': str(row[7]) if row[7] else None,
513 'livekit_room_sid': row[8], 'max_participants': row[9],
514 'settings': settings,
515 }
518__all__ = ['CallService', 'CallError', 'ALLOWED_CALL_KINDS']