Coverage for integrations / social / device_routing_service.py: 97.4%
76 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Device Routing Service — Cross-Device Agent Communication
4Routes agent actions (TTS, consent requests) to the best device for a user.
5Uses existing FleetCommandService for delivery and NotificationService for alerts.
7Device selection priority for TTS:
8 1. Phone (has speaker + local TTS capability)
9 2. Desktop/tablet (STANDARD+ tier, full TTS)
10 3. Cloud fallback (no local device available)
12When target is a watch: find phone as relay → push tts_stream with relay_to_device_id.
13"""
14import json
15import logging
16from typing import Dict, List, Optional
18from .fleet_command import FleetCommandService
19from .models import DeviceBinding
20from .services import NotificationService
22logger = logging.getLogger('hevolve_social')
24# Form factor priority for TTS (lower index = preferred)
25# 'robot' added — robots with speakers can receive TTS (lowest priority)
26_TTS_PRIORITY = ['phone', 'desktop', 'tablet', 'tv', 'embedded', 'robot']
29class DeviceRoutingService:
30 """Static service for routing agent actions to the right user device."""
32 @staticmethod
33 def get_user_device_map(db, user_id: str) -> List[Dict]:
34 """List all active devices for a user with parsed capabilities.
36 Args:
37 db: SQLAlchemy session.
38 user_id: Target user.
40 Returns:
41 List of DeviceBinding.to_dict() with parsed capabilities.
42 """
43 devices = db.query(DeviceBinding).filter_by(
44 user_id=user_id, is_active=True,
45 ).all()
46 return [d.to_dict() for d in devices]
48 @staticmethod
49 def pick_device(db, user_id: str, required_capability: str = 'tts') -> Optional[Dict]:
50 """Find the best device for a capability.
52 Args:
53 db: SQLAlchemy session.
54 user_id: Target user.
55 required_capability: Capability key to look for (e.g. 'tts', 'mic', 'speaker').
57 Returns:
58 DeviceBinding.to_dict() of the best device, or None.
59 """
60 devices = db.query(DeviceBinding).filter_by(
61 user_id=user_id, is_active=True,
62 ).all()
64 candidates = []
65 for d in devices:
66 caps = d.capabilities
67 if caps.get(required_capability):
68 try:
69 priority = _TTS_PRIORITY.index(d.form_factor)
70 except ValueError:
71 priority = len(_TTS_PRIORITY)
72 candidates.append((priority, d))
74 if not candidates:
75 return None
77 candidates.sort(key=lambda x: x[0])
78 return candidates[0][1].to_dict()
80 @staticmethod
81 def route_tts(
82 db, user_id: str, text: str,
83 agent_id: str = '', voice: str = 'default', lang: str = 'en',
84 ) -> Dict:
85 """Route TTS to the best device for this user.
87 If user has a watch as their only device, find a phone to relay through.
88 Falls back to cloud notification if no TTS device available.
90 Args:
91 db: SQLAlchemy session.
92 user_id: Target user.
93 text: Text to speak.
94 agent_id: Agent requesting TTS.
95 voice: TTS voice ID.
96 lang: Language code.
98 Returns:
99 {success, device_id, method} dict.
100 """
101 devices = db.query(DeviceBinding).filter_by(
102 user_id=user_id, is_active=True,
103 ).all()
105 if not devices:
106 return {'success': False, 'error': 'No devices linked for user'}
108 # Find TTS-capable device
109 tts_device = None
110 relay_target = None
111 best_priority = len(_TTS_PRIORITY) + 1
113 for d in devices:
114 caps = d.capabilities
115 if caps.get('tts'):
116 try:
117 prio = _TTS_PRIORITY.index(d.form_factor)
118 except ValueError:
119 prio = len(_TTS_PRIORITY)
120 if prio < best_priority:
121 best_priority = prio
122 tts_device = d
124 # Check if there's a watch that needs relay
125 watch_devices = [d for d in devices if d.form_factor == 'watch']
127 params = {
128 'text': text,
129 'voice': voice,
130 'lang': lang,
131 'agent_id': agent_id,
132 }
134 if tts_device:
135 # If there's a watch, relay through the TTS device
136 if watch_devices:
137 params['relay_to_device_id'] = watch_devices[0].device_id
139 FleetCommandService.push_command(
140 db, tts_device.device_id, 'tts_stream', params,
141 )
142 db.flush()
143 return {
144 'success': True,
145 'device_id': tts_device.device_id,
146 'method': 'fleet_command',
147 'relay_to': params.get('relay_to_device_id', ''),
148 }
150 # No TTS device — fall back to notification
151 try:
152 NotificationService.create(
153 db, user_id, 'agent_tts_fallback',
154 source_user_id=agent_id,
155 message=text,
156 )
157 db.flush()
158 return {'success': True, 'device_id': '', 'method': 'notification_fallback'}
159 except Exception as e:
160 logger.debug(f"DeviceRouting: TTS fallback failed: {e}")
161 return {'success': False, 'error': str(e)}
163 @staticmethod
164 def request_consent(
165 db, user_id: str, action: str, agent_id: str,
166 description: str = '', timeout_s: int = 60,
167 ) -> Dict:
168 """Push consent request to user's primary device.
170 Creates both a Notification (persistent, cross-device visible) and
171 a FleetCommand (real-time push to best device).
173 Args:
174 db: SQLAlchemy session.
175 user_id: Target user.
176 action: What the agent wants to do.
177 agent_id: Which agent is requesting.
178 description: Human-readable explanation.
179 timeout_s: How long to wait for response.
181 Returns:
182 {success, command_id, device_id} dict.
183 """
184 # Create persistent notification (visible on all devices)
185 NotificationService.create(
186 db, user_id, 'agent_consent_request',
187 source_user_id=agent_id,
188 message=f"[{action}] {description}",
189 )
191 # Find primary device (phone > desktop > tablet > any)
192 devices = db.query(DeviceBinding).filter_by(
193 user_id=user_id, is_active=True,
194 ).order_by(DeviceBinding.last_sync_at.desc()).all()
196 if not devices:
197 db.flush()
198 return {'success': True, 'command_id': None, 'device_id': '',
199 'method': 'notification_only'}
201 # Pick best device — prefer phone, then by recency
202 target = devices[0]
203 for d in devices:
204 if d.form_factor == 'phone':
205 target = d
206 break
208 params = {
209 'action': action,
210 'agent_id': agent_id,
211 'description': description,
212 'timeout_s': timeout_s,
213 }
214 cmd = FleetCommandService.push_command(
215 db, target.device_id, 'agent_consent', params,
216 )
217 db.flush()
219 cmd_id = cmd.get('id') if cmd else None
220 return {
221 'success': True,
222 'command_id': cmd_id,
223 'device_id': target.device_id,
224 'method': 'fleet_command',
225 }