Coverage for integrations / social / livekit_service.py: 38.8%
49 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — LiveKit token + room helper (deploy-mode aware stub).
4Phase 7d. Plan reference: sunny-gliding-eich.md, Part E.7 + Part R.6.
6LiveKit is the FALLBACK media transport when:
7 - Call has > 4 participants (mesh efficiency drops).
8 - Network topology prevents direct WebRTC P2P (NAT, mobile carrier).
9 - One participant is an AgentVoiceBridge (the bridge needs a stable
10 rendezvous URL, which LiveKit provides).
12For 1:1 + small group calls (≤ 4) on the same network, the clients
13run a WebRTC P2P mesh signaled over PeerLink DISPATCH channel. This
14service issues a LiveKit token only when called explicitly — clients
15that succeed at mesh never request one.
17Deploy-mode adaptation (Plan E.7 + Part Q):
18 - central → managed LiveKit Cloud or self-hosted SFU.
19 LIVEKIT_URL + LIVEKIT_API_KEY + LIVEKIT_API_SECRET
20 env vars come from per-tenant config.
21 - regional → ship `livekit-server` binary alongside HARTOS,
22 one per LAN node.
23 - flat / Nunba → LIVEKIT_URL empty → token issuance returns
24 {'mode': 'p2p_mesh'} so clients fall back to
25 WebRTC P2P + PeerLink signaling.
27This module is INTENTIONALLY a stub today: the actual `livekit-server-
28sdk-python` import + token signing land alongside Phase 7d.B (when the
29deployment infra is provisioned). The contract is the JSON shape the
30clients expect; we ship the contract first so tests + RN client work
31land green ahead of infra.
32"""
34from __future__ import annotations
36import json
37import logging
38import os
39import time
40from datetime import timedelta
41from typing import Any, Dict, Optional
43logger = logging.getLogger('hevolve_social')
46# Phase 7d.B — best-effort import of the LiveKit server SDK.
47# When installed (`pip install livekit-api`), this module signs
48# real JWTs. When not installed (flat / regional / Nunba bundled
49# without LiveKit), we fall back to the stub shape and clients
50# route to WebRTC P2P mesh via PeerLink.
51try:
52 from livekit import api as livekit_api # type: ignore
53 _HAS_LIVEKIT_SDK = True
54except Exception:
55 livekit_api = None
56 _HAS_LIVEKIT_SDK = False
59def _resolved_config():
60 """Return (url, api_key, api_secret) — env-vars override; else fall
61 back to the supervisor's auto-generated dev keys + localhost URL.
63 Flat/regional installs that haven't been provisioned with central-
64 issued keys still get a working signer because livekit_supervisor
65 auto-generates a dev key/secret pair on first start. Central
66 deploys (which don't host an SFU) leave LIVEKIT_DISABLE=1; that
67 short-circuits supervisor_should_run() so no dev keys exist and
68 we fall back to the {mode: 'p2p_mesh'} response.
69 """
70 env_url = os.environ.get('LIVEKIT_URL')
71 env_key = os.environ.get('LIVEKIT_API_KEY')
72 env_secret = os.environ.get('LIVEKIT_API_SECRET')
73 if env_url and env_key and env_secret:
74 return env_url, env_key, env_secret
76 # Lazy import to avoid a circular dep during module init.
77 try:
78 from .livekit_supervisor import (
79 ensure_dev_keys,
80 get_livekit_url,
81 supervisor_should_run,
82 )
83 except Exception: # pragma: no cover — defensive
84 return None, None, None
86 if not supervisor_should_run():
87 return None, None, None
89 keys = ensure_dev_keys()
90 url = env_url or get_livekit_url()
91 return url, keys.get('api_key'), keys.get('api_secret')
94def _has_livekit_config() -> bool:
95 """True iff we have a complete (url, api_key, api_secret) triple
96 — either from operator-set env vars OR from the supervisor's auto-
97 generated dev keys when running in flat/regional mode.
98 """
99 url, key, secret = _resolved_config()
100 return bool(url and key and secret)
103class LiveKitService:
105 @staticmethod
106 def issue_token(call_id: str, user_id: str,
107 *, can_publish: bool = True,
108 can_publish_screen: bool = False,
109 is_agent: bool = False,
110 agent_bridge_node_id: Optional[str] = None,
111 ttl_seconds: int = 3600) -> Dict[str, Any]:
112 """Return a token-issuance result.
114 Shape always includes `mode` so the client knows whether to
115 connect to LiveKit (mode='livekit') or fall back to P2P mesh
116 (mode='p2p_mesh').
118 When mode='livekit', the result includes:
119 - url: the LiveKit server URL
120 - token: signed JWT for room=call_id, identity=user_id
121 - metadata: {agent_kind, agent_bridge_node_id, ...}
123 When mode='p2p_mesh', the result is just {mode, call_id} and
124 the client does its own WebRTC handshake via PeerLink.
125 """
126 url, api_key, api_secret = _resolved_config()
127 if not (url and api_key and api_secret):
128 return {
129 'mode': 'p2p_mesh',
130 'call_id': call_id,
131 'reason': 'no LIVEKIT config; central/embedded deploy or '
132 'LIVEKIT_DISABLE=1 set',
133 }
135 metadata = {
136 'agent_kind': 'agent' if is_agent else 'human',
137 'agent_bridge_node_id': agent_bridge_node_id,
138 'can_publish_screen': can_publish_screen,
139 }
141 # Phase 7d.B — sign a real LiveKit JWT when the SDK is
142 # available. The SDK builds the JWT with the identity +
143 # grants we want. Falls back to a stub shape when the SDK
144 # isn't installed so the REST contract stays testable.
145 if _HAS_LIVEKIT_SDK and livekit_api is not None:
146 try:
147 grants = livekit_api.VideoGrants(
148 room_join=True,
149 room=call_id,
150 can_publish=can_publish,
151 can_publish_data=True,
152 can_subscribe=True,
153 )
154 # `can_publish_screen` controls screen-share track
155 # publishing — gated by the AgentJoinGrant.scope on
156 # the caller side.
157 if can_publish_screen and hasattr(grants, 'can_publish_sources'):
158 grants.can_publish_sources = ['camera', 'microphone',
159 'screen_share',
160 'screen_share_audio']
161 token = (
162 livekit_api.AccessToken(api_key, api_secret)
163 .with_identity(user_id)
164 .with_grants(grants)
165 .with_metadata(json.dumps(metadata))
166 .with_ttl(timedelta(seconds=ttl_seconds))
167 .to_jwt()
168 )
169 return {
170 'mode': 'livekit',
171 'url': url,
172 'token': token,
173 'metadata': metadata,
174 'expires_at': int(time.time()) + ttl_seconds,
175 }
176 except Exception as e:
177 logger.warning(
178 "LiveKitService.issue_token: SDK failed (%s); "
179 "falling back to livekit_pending shape", e)
181 # SDK absent OR signing failed — return the pending shape
182 # so the client knows infra is configured but not ready.
183 # (Pass-4 P4-6: renamed from 'livekit_stub' for clarity.)
184 return {
185 'mode': 'livekit_pending',
186 'url': url,
187 'token': '',
188 'metadata': metadata,
189 'expires_at': int(time.time()) + ttl_seconds,
190 'reason': ('livekit-api SDK not installed; pip install '
191 'livekit-api to enable real token signing'),
192 }
194 @staticmethod
195 def end_room(call_id: str) -> Dict[str, Any]:
196 """Tear down a LiveKit room. No-op when no LiveKit configured."""
197 url, _key, _secret = _resolved_config()
198 if not url:
199 return {'mode': 'p2p_mesh', 'ended': True}
200 # SDK delete_room is async (asyncio coroutine) — for now we
201 # just signal end-of-call; the actual API call lands in a
202 # background task wired up by api_calls.end_call.
203 return {'mode': 'livekit', 'ended': True}
206__all__ = ['LiveKitService']