Coverage for integrations / social / device_routing_service.py: 97.4%

76 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Device Routing Service — Cross-Device Agent Communication 

3 

4Routes agent actions (TTS, consent requests) to the best device for a user. 

5Uses existing FleetCommandService for delivery and NotificationService for alerts. 

6 

7Device selection priority for TTS: 

8 1. Phone (has speaker + local TTS capability) 

9 2. Desktop/tablet (STANDARD+ tier, full TTS) 

10 3. Cloud fallback (no local device available) 

11 

12When target is a watch: find phone as relay → push tts_stream with relay_to_device_id. 

13""" 

14import json 

15import logging 

16from typing import Dict, List, Optional 

17 

18from .fleet_command import FleetCommandService 

19from .models import DeviceBinding 

20from .services import NotificationService 

21 

22logger = logging.getLogger('hevolve_social') 

23 

24# Form factor priority for TTS (lower index = preferred) 

25# 'robot' added — robots with speakers can receive TTS (lowest priority) 

26_TTS_PRIORITY = ['phone', 'desktop', 'tablet', 'tv', 'embedded', 'robot'] 

27 

28 

29class DeviceRoutingService: 

30 """Static service for routing agent actions to the right user device.""" 

31 

32 @staticmethod 

33 def get_user_device_map(db, user_id: str) -> List[Dict]: 

34 """List all active devices for a user with parsed capabilities. 

35 

36 Args: 

37 db: SQLAlchemy session. 

38 user_id: Target user. 

39 

40 Returns: 

41 List of DeviceBinding.to_dict() with parsed capabilities. 

42 """ 

43 devices = db.query(DeviceBinding).filter_by( 

44 user_id=user_id, is_active=True, 

45 ).all() 

46 return [d.to_dict() for d in devices] 

47 

48 @staticmethod 

49 def pick_device(db, user_id: str, required_capability: str = 'tts') -> Optional[Dict]: 

50 """Find the best device for a capability. 

51 

52 Args: 

53 db: SQLAlchemy session. 

54 user_id: Target user. 

55 required_capability: Capability key to look for (e.g. 'tts', 'mic', 'speaker'). 

56 

57 Returns: 

58 DeviceBinding.to_dict() of the best device, or None. 

59 """ 

60 devices = db.query(DeviceBinding).filter_by( 

61 user_id=user_id, is_active=True, 

62 ).all() 

63 

64 candidates = [] 

65 for d in devices: 

66 caps = d.capabilities 

67 if caps.get(required_capability): 

68 try: 

69 priority = _TTS_PRIORITY.index(d.form_factor) 

70 except ValueError: 

71 priority = len(_TTS_PRIORITY) 

72 candidates.append((priority, d)) 

73 

74 if not candidates: 

75 return None 

76 

77 candidates.sort(key=lambda x: x[0]) 

78 return candidates[0][1].to_dict() 

79 

80 @staticmethod 

81 def route_tts( 

82 db, user_id: str, text: str, 

83 agent_id: str = '', voice: str = 'default', lang: str = 'en', 

84 ) -> Dict: 

85 """Route TTS to the best device for this user. 

86 

87 If user has a watch as their only device, find a phone to relay through. 

88 Falls back to cloud notification if no TTS device available. 

89 

90 Args: 

91 db: SQLAlchemy session. 

92 user_id: Target user. 

93 text: Text to speak. 

94 agent_id: Agent requesting TTS. 

95 voice: TTS voice ID. 

96 lang: Language code. 

97 

98 Returns: 

99 {success, device_id, method} dict. 

100 """ 

101 devices = db.query(DeviceBinding).filter_by( 

102 user_id=user_id, is_active=True, 

103 ).all() 

104 

105 if not devices: 

106 return {'success': False, 'error': 'No devices linked for user'} 

107 

108 # Find TTS-capable device 

109 tts_device = None 

110 relay_target = None 

111 best_priority = len(_TTS_PRIORITY) + 1 

112 

113 for d in devices: 

114 caps = d.capabilities 

115 if caps.get('tts'): 

116 try: 

117 prio = _TTS_PRIORITY.index(d.form_factor) 

118 except ValueError: 

119 prio = len(_TTS_PRIORITY) 

120 if prio < best_priority: 

121 best_priority = prio 

122 tts_device = d 

123 

124 # Check if there's a watch that needs relay 

125 watch_devices = [d for d in devices if d.form_factor == 'watch'] 

126 

127 params = { 

128 'text': text, 

129 'voice': voice, 

130 'lang': lang, 

131 'agent_id': agent_id, 

132 } 

133 

134 if tts_device: 

135 # If there's a watch, relay through the TTS device 

136 if watch_devices: 

137 params['relay_to_device_id'] = watch_devices[0].device_id 

138 

139 FleetCommandService.push_command( 

140 db, tts_device.device_id, 'tts_stream', params, 

141 ) 

142 db.flush() 

143 return { 

144 'success': True, 

145 'device_id': tts_device.device_id, 

146 'method': 'fleet_command', 

147 'relay_to': params.get('relay_to_device_id', ''), 

148 } 

149 

150 # No TTS device — fall back to notification 

151 try: 

152 NotificationService.create( 

153 db, user_id, 'agent_tts_fallback', 

154 source_user_id=agent_id, 

155 message=text, 

156 ) 

157 db.flush() 

158 return {'success': True, 'device_id': '', 'method': 'notification_fallback'} 

159 except Exception as e: 

160 logger.debug(f"DeviceRouting: TTS fallback failed: {e}") 

161 return {'success': False, 'error': str(e)} 

162 

163 @staticmethod 

164 def request_consent( 

165 db, user_id: str, action: str, agent_id: str, 

166 description: str = '', timeout_s: int = 60, 

167 ) -> Dict: 

168 """Push consent request to user's primary device. 

169 

170 Creates both a Notification (persistent, cross-device visible) and 

171 a FleetCommand (real-time push to best device). 

172 

173 Args: 

174 db: SQLAlchemy session. 

175 user_id: Target user. 

176 action: What the agent wants to do. 

177 agent_id: Which agent is requesting. 

178 description: Human-readable explanation. 

179 timeout_s: How long to wait for response. 

180 

181 Returns: 

182 {success, command_id, device_id} dict. 

183 """ 

184 # Create persistent notification (visible on all devices) 

185 NotificationService.create( 

186 db, user_id, 'agent_consent_request', 

187 source_user_id=agent_id, 

188 message=f"[{action}] {description}", 

189 ) 

190 

191 # Find primary device (phone > desktop > tablet > any) 

192 devices = db.query(DeviceBinding).filter_by( 

193 user_id=user_id, is_active=True, 

194 ).order_by(DeviceBinding.last_sync_at.desc()).all() 

195 

196 if not devices: 

197 db.flush() 

198 return {'success': True, 'command_id': None, 'device_id': '', 

199 'method': 'notification_only'} 

200 

201 # Pick best device — prefer phone, then by recency 

202 target = devices[0] 

203 for d in devices: 

204 if d.form_factor == 'phone': 

205 target = d 

206 break 

207 

208 params = { 

209 'action': action, 

210 'agent_id': agent_id, 

211 'description': description, 

212 'timeout_s': timeout_s, 

213 } 

214 cmd = FleetCommandService.push_command( 

215 db, target.device_id, 'agent_consent', params, 

216 ) 

217 db.flush() 

218 

219 cmd_id = cmd.get('id') if cmd else None 

220 return { 

221 'success': True, 

222 'command_id': cmd_id, 

223 'device_id': target.device_id, 

224 'method': 'fleet_command', 

225 }