Coverage for integrations / providers / neuro_discovery.py: 41.3%

75 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""Unified discovery — finds connected neuro / biometric devices across 

2every transport in one call. 

3 

4scan() is the single entrypoint. Each transport block is wrapped in 

5its own try/except so a missing optional dependency (bleak, pylsl, 

6pyserial) degrades to "skip this transport", never crashes discovery. 

7 

8Each entry in the returned list is a (provider_id_or_none, info_dict) 

9tuple: 

10 - provider_id is the registry slug if the device matched a seeded 

11 entry by name or BLE service UUID; 

12 - None if discovery found an unknown device (still surfaced so a dev 

13 can decide to add a NeuroProvider entry for it). 

14""" 

15 

16from __future__ import annotations 

17 

18import logging 

19from typing import Any, Dict, List, Optional, Tuple 

20 

21from integrations.providers.neuro_providers import ( 

22 NEURO_REGISTRY, 

23 NeuroProvider, 

24 Transport, 

25) 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30# ─── Matching helpers ───────────────────────────────────────────────── 

31 

32def _match_ble_service(service_uuid: str) -> Optional[NeuroProvider]: 

33 if not service_uuid: 

34 return None 

35 target = service_uuid.lower() 

36 for p in NEURO_REGISTRY.values(): 

37 if p.ble_service_uuid and p.ble_service_uuid.lower() == target: 

38 return p 

39 return None 

40 

41 

42def _match_name(name: str) -> Optional[NeuroProvider]: 

43 if not name: 

44 return None 

45 low = name.lower() 

46 for p in NEURO_REGISTRY.values(): 

47 if p.id in low: 

48 return p 

49 first_word = p.name.lower().split()[0] if p.name else '' 

50 if first_word and first_word in low: 

51 return p 

52 return None 

53 

54 

55# ─── Per-transport scans (all fail-soft on missing deps) ────────────── 

56 

57def _scan_ble(timeout_s: float = 4.0) -> List[Tuple[Optional[str], Dict[str, Any]]]: 

58 try: 

59 import asyncio 

60 from bleak import BleakScanner # type: ignore 

61 except ImportError: 

62 logger.debug('bleak not installed — skipping BLE scan') 

63 return [] 

64 

65 async def _go(): 

66 return await BleakScanner.discover(timeout=timeout_s) 

67 

68 try: 

69 devices = asyncio.run(_go()) 

70 except Exception as e: 

71 logger.debug(f'BLE scan failed: {e}') 

72 return [] 

73 

74 out: List[Tuple[Optional[str], Dict[str, Any]]] = [] 

75 for d in devices: 

76 info = { 

77 'transport': Transport.BLE.value, 

78 'address': getattr(d, 'address', None), 

79 'name': getattr(d, 'name', None), 

80 } 

81 matched = _match_name(info['name'] or '') 

82 out.append((matched.id if matched else None, info)) 

83 return out 

84 

85 

86def _scan_lsl(timeout_s: float = 2.0) -> List[Tuple[Optional[str], Dict[str, Any]]]: 

87 try: 

88 from pylsl import resolve_streams # type: ignore 

89 except ImportError: 

90 logger.debug('pylsl not installed — skipping LSL scan') 

91 return [] 

92 try: 

93 streams = resolve_streams(wait_time=timeout_s) 

94 except Exception as e: 

95 logger.debug(f'LSL resolve failed: {e}') 

96 return [] 

97 

98 out: List[Tuple[Optional[str], Dict[str, Any]]] = [] 

99 for s in streams: 

100 try: 

101 info = { 

102 'transport': Transport.LSL.value, 

103 'name': s.name(), 

104 'type': s.type(), 

105 'channel_count': s.channel_count(), 

106 'sample_rate_hz': int(s.nominal_srate() or 0), 

107 } 

108 except Exception: 

109 continue 

110 matched = _match_name(info.get('name') or '') 

111 out.append((matched.id if matched else None, info)) 

112 return out 

113 

114 

115def _scan_usb_serial() -> List[Tuple[Optional[str], Dict[str, Any]]]: 

116 try: 

117 from serial.tools import list_ports # type: ignore 

118 except ImportError: 

119 logger.debug('pyserial not installed — skipping USB-serial scan') 

120 return [] 

121 out: List[Tuple[Optional[str], Dict[str, Any]]] = [] 

122 try: 

123 ports = list_ports.comports() 

124 except Exception as e: 

125 logger.debug(f'USB-serial scan failed: {e}') 

126 return [] 

127 for port in ports: 

128 info = { 

129 'transport': Transport.USB_SERIAL.value, 

130 'device': port.device, 

131 'description': port.description, 

132 'vid': getattr(port, 'vid', None), 

133 'pid': getattr(port, 'pid', None), 

134 } 

135 matched = _match_name(info.get('description') or '') 

136 out.append((matched.id if matched else None, info)) 

137 return out 

138 

139 

140# ─── Public entrypoint ──────────────────────────────────────────────── 

141 

142def scan( 

143 include_ble: bool = True, 

144 include_lsl: bool = True, 

145 include_usb: bool = True, 

146 ble_timeout_s: float = 4.0, 

147) -> List[Tuple[Optional[str], Dict[str, Any]]]: 

148 """Aggregate discovery across every enabled transport. Fail-soft: 

149 missing optional deps (bleak, pylsl, pyserial) log-debug and 

150 return [] for that transport rather than crashing the scan. 

151 """ 

152 found: List[Tuple[Optional[str], Dict[str, Any]]] = [] 

153 if include_ble: 

154 found.extend(_scan_ble(timeout_s=ble_timeout_s)) 

155 if include_lsl: 

156 found.extend(_scan_lsl()) 

157 if include_usb: 

158 found.extend(_scan_usb_serial()) 

159 return found 

160 

161 

162def known_providers_summary() -> List[Dict[str, Any]]: 

163 """Static overview of the full registry — no hardware touched. 

164 

165 Admin UI renders this alongside scan() results so users see 

166 "what's theoretically supported" and "what's connected right now" 

167 side by side. 

168 """ 

169 out: List[Dict[str, Any]] = [] 

170 for p in NEURO_REGISTRY.values(): 

171 out.append({ 

172 'id': p.id, 

173 'name': p.name, 

174 'form_factor': p.form_factor.value, 

175 'status': p.status.value, 

176 'signals': [s.value for s in p.signals], 

177 'transports': [t.value for t in p.transport], 

178 'sample_rate_hz': p.sample_rate_hz, 

179 'channels': p.channels, 

180 'docs_url': p.docs_url, 

181 'website': p.website, 

182 'contact_email': p.contact_email, 

183 'notes': p.notes, 

184 }) 

185 return out