Coverage for integrations / providers / neuro_discovery.py: 41.3%
75 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""Unified discovery — finds connected neuro / biometric devices across
2every transport in one call.
4scan() is the single entrypoint. Each transport block is wrapped in
5its own try/except so a missing optional dependency (bleak, pylsl,
6pyserial) degrades to "skip this transport", never crashes discovery.
8Each entry in the returned list is a (provider_id_or_none, info_dict)
9tuple:
10 - provider_id is the registry slug if the device matched a seeded
11 entry by name or BLE service UUID;
12 - None if discovery found an unknown device (still surfaced so a dev
13 can decide to add a NeuroProvider entry for it).
14"""
16from __future__ import annotations
18import logging
19from typing import Any, Dict, List, Optional, Tuple
21from integrations.providers.neuro_providers import (
22 NEURO_REGISTRY,
23 NeuroProvider,
24 Transport,
25)
27logger = logging.getLogger(__name__)
30# ─── Matching helpers ─────────────────────────────────────────────────
32def _match_ble_service(service_uuid: str) -> Optional[NeuroProvider]:
33 if not service_uuid:
34 return None
35 target = service_uuid.lower()
36 for p in NEURO_REGISTRY.values():
37 if p.ble_service_uuid and p.ble_service_uuid.lower() == target:
38 return p
39 return None
42def _match_name(name: str) -> Optional[NeuroProvider]:
43 if not name:
44 return None
45 low = name.lower()
46 for p in NEURO_REGISTRY.values():
47 if p.id in low:
48 return p
49 first_word = p.name.lower().split()[0] if p.name else ''
50 if first_word and first_word in low:
51 return p
52 return None
55# ─── Per-transport scans (all fail-soft on missing deps) ──────────────
57def _scan_ble(timeout_s: float = 4.0) -> List[Tuple[Optional[str], Dict[str, Any]]]:
58 try:
59 import asyncio
60 from bleak import BleakScanner # type: ignore
61 except ImportError:
62 logger.debug('bleak not installed — skipping BLE scan')
63 return []
65 async def _go():
66 return await BleakScanner.discover(timeout=timeout_s)
68 try:
69 devices = asyncio.run(_go())
70 except Exception as e:
71 logger.debug(f'BLE scan failed: {e}')
72 return []
74 out: List[Tuple[Optional[str], Dict[str, Any]]] = []
75 for d in devices:
76 info = {
77 'transport': Transport.BLE.value,
78 'address': getattr(d, 'address', None),
79 'name': getattr(d, 'name', None),
80 }
81 matched = _match_name(info['name'] or '')
82 out.append((matched.id if matched else None, info))
83 return out
86def _scan_lsl(timeout_s: float = 2.0) -> List[Tuple[Optional[str], Dict[str, Any]]]:
87 try:
88 from pylsl import resolve_streams # type: ignore
89 except ImportError:
90 logger.debug('pylsl not installed — skipping LSL scan')
91 return []
92 try:
93 streams = resolve_streams(wait_time=timeout_s)
94 except Exception as e:
95 logger.debug(f'LSL resolve failed: {e}')
96 return []
98 out: List[Tuple[Optional[str], Dict[str, Any]]] = []
99 for s in streams:
100 try:
101 info = {
102 'transport': Transport.LSL.value,
103 'name': s.name(),
104 'type': s.type(),
105 'channel_count': s.channel_count(),
106 'sample_rate_hz': int(s.nominal_srate() or 0),
107 }
108 except Exception:
109 continue
110 matched = _match_name(info.get('name') or '')
111 out.append((matched.id if matched else None, info))
112 return out
115def _scan_usb_serial() -> List[Tuple[Optional[str], Dict[str, Any]]]:
116 try:
117 from serial.tools import list_ports # type: ignore
118 except ImportError:
119 logger.debug('pyserial not installed — skipping USB-serial scan')
120 return []
121 out: List[Tuple[Optional[str], Dict[str, Any]]] = []
122 try:
123 ports = list_ports.comports()
124 except Exception as e:
125 logger.debug(f'USB-serial scan failed: {e}')
126 return []
127 for port in ports:
128 info = {
129 'transport': Transport.USB_SERIAL.value,
130 'device': port.device,
131 'description': port.description,
132 'vid': getattr(port, 'vid', None),
133 'pid': getattr(port, 'pid', None),
134 }
135 matched = _match_name(info.get('description') or '')
136 out.append((matched.id if matched else None, info))
137 return out
140# ─── Public entrypoint ────────────────────────────────────────────────
142def scan(
143 include_ble: bool = True,
144 include_lsl: bool = True,
145 include_usb: bool = True,
146 ble_timeout_s: float = 4.0,
147) -> List[Tuple[Optional[str], Dict[str, Any]]]:
148 """Aggregate discovery across every enabled transport. Fail-soft:
149 missing optional deps (bleak, pylsl, pyserial) log-debug and
150 return [] for that transport rather than crashing the scan.
151 """
152 found: List[Tuple[Optional[str], Dict[str, Any]]] = []
153 if include_ble:
154 found.extend(_scan_ble(timeout_s=ble_timeout_s))
155 if include_lsl:
156 found.extend(_scan_lsl())
157 if include_usb:
158 found.extend(_scan_usb_serial())
159 return found
162def known_providers_summary() -> List[Dict[str, Any]]:
163 """Static overview of the full registry — no hardware touched.
165 Admin UI renders this alongside scan() results so users see
166 "what's theoretically supported" and "what's connected right now"
167 side by side.
168 """
169 out: List[Dict[str, Any]] = []
170 for p in NEURO_REGISTRY.values():
171 out.append({
172 'id': p.id,
173 'name': p.name,
174 'form_factor': p.form_factor.value,
175 'status': p.status.value,
176 'signals': [s.value for s in p.signals],
177 'transports': [t.value for t in p.transport],
178 'sample_rate_hz': p.sample_rate_hz,
179 'channels': p.channels,
180 'docs_url': p.docs_url,
181 'website': p.website,
182 'contact_email': p.contact_email,
183 'notes': p.notes,
184 })
185 return out