Coverage for integrations / social / livekit_service.py: 38.8%

49 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — LiveKit token + room helper (deploy-mode aware stub). 

3 

4Phase 7d. Plan reference: sunny-gliding-eich.md, Part E.7 + Part R.6. 

5 

6LiveKit is the FALLBACK media transport when: 

7 - Call has > 4 participants (mesh efficiency drops). 

8 - Network topology prevents direct WebRTC P2P (NAT, mobile carrier). 

9 - One participant is an AgentVoiceBridge (the bridge needs a stable 

10 rendezvous URL, which LiveKit provides). 

11 

12For 1:1 + small group calls (≤ 4) on the same network, the clients 

13run a WebRTC P2P mesh signaled over PeerLink DISPATCH channel. This 

14service issues a LiveKit token only when called explicitly — clients 

15that succeed at mesh never request one. 

16 

17Deploy-mode adaptation (Plan E.7 + Part Q): 

18 - central → managed LiveKit Cloud or self-hosted SFU. 

19 LIVEKIT_URL + LIVEKIT_API_KEY + LIVEKIT_API_SECRET 

20 env vars come from per-tenant config. 

21 - regional → ship `livekit-server` binary alongside HARTOS, 

22 one per LAN node. 

23 - flat / Nunba → LIVEKIT_URL empty → token issuance returns 

24 {'mode': 'p2p_mesh'} so clients fall back to 

25 WebRTC P2P + PeerLink signaling. 

26 

27This module is INTENTIONALLY a stub today: the actual `livekit-server- 

28sdk-python` import + token signing land alongside Phase 7d.B (when the 

29deployment infra is provisioned). The contract is the JSON shape the 

30clients expect; we ship the contract first so tests + RN client work 

31land green ahead of infra. 

32""" 

33 

34from __future__ import annotations 

35 

36import json 

37import logging 

38import os 

39import time 

40from datetime import timedelta 

41from typing import Any, Dict, Optional 

42 

43logger = logging.getLogger('hevolve_social') 

44 

45 

46# Phase 7d.B — best-effort import of the LiveKit server SDK. 

47# When installed (`pip install livekit-api`), this module signs 

48# real JWTs. When not installed (flat / regional / Nunba bundled 

49# without LiveKit), we fall back to the stub shape and clients 

50# route to WebRTC P2P mesh via PeerLink. 

51try: 

52 from livekit import api as livekit_api # type: ignore 

53 _HAS_LIVEKIT_SDK = True 

54except Exception: 

55 livekit_api = None 

56 _HAS_LIVEKIT_SDK = False 

57 

58 

59def _resolved_config(): 

60 """Return (url, api_key, api_secret) — env-vars override; else fall 

61 back to the supervisor's auto-generated dev keys + localhost URL. 

62 

63 Flat/regional installs that haven't been provisioned with central- 

64 issued keys still get a working signer because livekit_supervisor 

65 auto-generates a dev key/secret pair on first start. Central 

66 deploys (which don't host an SFU) leave LIVEKIT_DISABLE=1; that 

67 short-circuits supervisor_should_run() so no dev keys exist and 

68 we fall back to the {mode: 'p2p_mesh'} response. 

69 """ 

70 env_url = os.environ.get('LIVEKIT_URL') 

71 env_key = os.environ.get('LIVEKIT_API_KEY') 

72 env_secret = os.environ.get('LIVEKIT_API_SECRET') 

73 if env_url and env_key and env_secret: 

74 return env_url, env_key, env_secret 

75 

76 # Lazy import to avoid a circular dep during module init. 

77 try: 

78 from .livekit_supervisor import ( 

79 ensure_dev_keys, 

80 get_livekit_url, 

81 supervisor_should_run, 

82 ) 

83 except Exception: # pragma: no cover — defensive 

84 return None, None, None 

85 

86 if not supervisor_should_run(): 

87 return None, None, None 

88 

89 keys = ensure_dev_keys() 

90 url = env_url or get_livekit_url() 

91 return url, keys.get('api_key'), keys.get('api_secret') 

92 

93 

94def _has_livekit_config() -> bool: 

95 """True iff we have a complete (url, api_key, api_secret) triple 

96 — either from operator-set env vars OR from the supervisor's auto- 

97 generated dev keys when running in flat/regional mode. 

98 """ 

99 url, key, secret = _resolved_config() 

100 return bool(url and key and secret) 

101 

102 

103class LiveKitService: 

104 

105 @staticmethod 

106 def issue_token(call_id: str, user_id: str, 

107 *, can_publish: bool = True, 

108 can_publish_screen: bool = False, 

109 is_agent: bool = False, 

110 agent_bridge_node_id: Optional[str] = None, 

111 ttl_seconds: int = 3600) -> Dict[str, Any]: 

112 """Return a token-issuance result. 

113 

114 Shape always includes `mode` so the client knows whether to 

115 connect to LiveKit (mode='livekit') or fall back to P2P mesh 

116 (mode='p2p_mesh'). 

117 

118 When mode='livekit', the result includes: 

119 - url: the LiveKit server URL 

120 - token: signed JWT for room=call_id, identity=user_id 

121 - metadata: {agent_kind, agent_bridge_node_id, ...} 

122 

123 When mode='p2p_mesh', the result is just {mode, call_id} and 

124 the client does its own WebRTC handshake via PeerLink. 

125 """ 

126 url, api_key, api_secret = _resolved_config() 

127 if not (url and api_key and api_secret): 

128 return { 

129 'mode': 'p2p_mesh', 

130 'call_id': call_id, 

131 'reason': 'no LIVEKIT config; central/embedded deploy or ' 

132 'LIVEKIT_DISABLE=1 set', 

133 } 

134 

135 metadata = { 

136 'agent_kind': 'agent' if is_agent else 'human', 

137 'agent_bridge_node_id': agent_bridge_node_id, 

138 'can_publish_screen': can_publish_screen, 

139 } 

140 

141 # Phase 7d.B — sign a real LiveKit JWT when the SDK is 

142 # available. The SDK builds the JWT with the identity + 

143 # grants we want. Falls back to a stub shape when the SDK 

144 # isn't installed so the REST contract stays testable. 

145 if _HAS_LIVEKIT_SDK and livekit_api is not None: 

146 try: 

147 grants = livekit_api.VideoGrants( 

148 room_join=True, 

149 room=call_id, 

150 can_publish=can_publish, 

151 can_publish_data=True, 

152 can_subscribe=True, 

153 ) 

154 # `can_publish_screen` controls screen-share track 

155 # publishing — gated by the AgentJoinGrant.scope on 

156 # the caller side. 

157 if can_publish_screen and hasattr(grants, 'can_publish_sources'): 

158 grants.can_publish_sources = ['camera', 'microphone', 

159 'screen_share', 

160 'screen_share_audio'] 

161 token = ( 

162 livekit_api.AccessToken(api_key, api_secret) 

163 .with_identity(user_id) 

164 .with_grants(grants) 

165 .with_metadata(json.dumps(metadata)) 

166 .with_ttl(timedelta(seconds=ttl_seconds)) 

167 .to_jwt() 

168 ) 

169 return { 

170 'mode': 'livekit', 

171 'url': url, 

172 'token': token, 

173 'metadata': metadata, 

174 'expires_at': int(time.time()) + ttl_seconds, 

175 } 

176 except Exception as e: 

177 logger.warning( 

178 "LiveKitService.issue_token: SDK failed (%s); " 

179 "falling back to livekit_pending shape", e) 

180 

181 # SDK absent OR signing failed — return the pending shape 

182 # so the client knows infra is configured but not ready. 

183 # (Pass-4 P4-6: renamed from 'livekit_stub' for clarity.) 

184 return { 

185 'mode': 'livekit_pending', 

186 'url': url, 

187 'token': '', 

188 'metadata': metadata, 

189 'expires_at': int(time.time()) + ttl_seconds, 

190 'reason': ('livekit-api SDK not installed; pip install ' 

191 'livekit-api to enable real token signing'), 

192 } 

193 

194 @staticmethod 

195 def end_room(call_id: str) -> Dict[str, Any]: 

196 """Tear down a LiveKit room. No-op when no LiveKit configured.""" 

197 url, _key, _secret = _resolved_config() 

198 if not url: 

199 return {'mode': 'p2p_mesh', 'ended': True} 

200 # SDK delete_room is async (asyncio coroutine) — for now we 

201 # just signal end-of-call; the actual API call lands in a 

202 # background task wired up by api_calls.end_call. 

203 return {'mode': 'livekit', 'ended': True} 

204 

205 

206__all__ = ['LiveKitService']