Coverage for integrations / providers / neuro_adapter.py: 95.4%
65 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""Adapter protocol for neuro / biometric providers.
3One abstract base (`NeuroAdapter`), one factory (`choose_adapter`) that
4routes by the provider's primary transport, concrete adapter stubs per
5transport that raise a clear "install <sdk> and implement X" error when
6the vendor SDK isn't present.
8Why stubs instead of full implementations:
10 - Each vendor's wire protocol is different; shipping concrete clients
11 for all of them would pull in ~10 optional dependency chains.
12 - The stub error message includes the exact pip package name and docs
13 URL so a developer can go from "I want to talk to this device" to
14 working code in one sitting.
15 - Contact-gated providers (Yneuro) get a separate ContactGatedAdapter
16 that raises PermissionError with the vendor's contact email — clear
17 that the block is at the business layer, not the wire.
19Cross-network sensor reads reuse PeerLink's existing `sensor` channel
20(core/peer_link/channels.py) — E2E-encrypted. Zero new wire protocol.
21"""
23from __future__ import annotations
25import logging
26from abc import ABC, abstractmethod
27from dataclasses import dataclass
28from typing import Any, Dict, Optional
30from integrations.providers.neuro_providers import (
31 NeuroProvider,
32 ProviderStatus,
33 SignalType,
34 Transport,
35)
37logger = logging.getLogger(__name__)
40# ─── Reading shape ────────────────────────────────────────────────────
42@dataclass
43class Reading:
44 """One chunk of sensor output from a connected provider."""
45 provider_id: str
46 signal: SignalType
47 data: Any # numpy array / dict / list — transport-dependent
48 sample_rate_hz: int
49 timestamp_start: float
50 duration_s: float
51 metadata: Dict[str, Any] = None # channel names, units, etc.
54# ─── Abstract base ────────────────────────────────────────────────────
56class NeuroAdapter(ABC):
57 """Every concrete adapter implements these three methods."""
59 def __init__(self, provider: NeuroProvider):
60 self.provider = provider
61 self._connected = False
63 @abstractmethod
64 def connect(self, **kwargs) -> bool:
65 """Open the underlying transport. Returns True on success.
66 May raise NotImplementedError (SDK missing) or PermissionError
67 (contact-gated provider)."""
69 @abstractmethod
70 def read_signal(
71 self, signal: SignalType, duration_s: float = 1.0,
72 ) -> Reading:
73 """Read `duration_s` seconds of the requested signal."""
75 @abstractmethod
76 def disconnect(self) -> None:
77 """Release the transport cleanly."""
79 def is_connected(self) -> bool:
80 return self._connected
83# ─── Concrete / stub adapters ─────────────────────────────────────────
85class _SDKUnavailable(NeuroAdapter):
86 """Base stub — every transport-specific stub overrides _install_hint.
87 Raises NotImplementedError on connect so the dev knows exactly
88 which pip package to install and where the vendor docs live."""
90 _install_hint = 'No adapter registered for this provider/transport.'
92 def connect(self, **kwargs) -> bool:
93 raise NotImplementedError(
94 f'{self.provider.name}: {self._install_hint} '
95 f'Docs: {self.provider.docs_url or self.provider.website}. '
96 f'Implement a NeuroAdapter subclass in '
97 f'integrations/providers/neuro_adapter.py and register it '
98 f'in _ADAPTER_BY_TRANSPORT.'
99 )
101 def read_signal(self, signal, duration_s=1.0):
102 raise NotImplementedError('connect() first')
104 def disconnect(self) -> None:
105 self._connected = False
108class ContactGatedAdapter(NeuroAdapter):
109 """For providers whose SDK/API is not public (e.g. Yneuro).
110 Raises PermissionError with the vendor's contact email so the
111 developer reaches out instead of assuming an integration exists."""
113 def connect(self, **kwargs) -> bool:
114 contact = self.provider.contact_email or self.provider.website
115 raise PermissionError(
116 f'{self.provider.name} does not publish a public SDK '
117 f'or API. Contact {contact} for developer access. '
118 f'Once you receive credentials, swap this adapter for a '
119 f'concrete one — registry entry is already wired.'
120 )
122 def read_signal(self, signal, duration_s=1.0):
123 raise PermissionError('provider contact-gated — connect() first')
125 def disconnect(self) -> None:
126 self._connected = False
129class BLEAdapter(_SDKUnavailable):
130 _install_hint = (
131 'BLE transport requires `bleak` (pip install bleak) plus the '
132 'vendor SDK — `muselsl` for Muse, `brainflow` for OpenBCI.'
133 )
136class RESTAdapter(_SDKUnavailable):
137 _install_hint = (
138 'REST transport needs the vendor OAuth flow. Implement a '
139 'NeuroAdapter subclass that calls requests.get() against the '
140 'documented base URL and caches the access token.'
141 )
144class WebSocketAdapter(_SDKUnavailable):
145 _install_hint = (
146 'WebSocket transport requires `websockets` (pip install '
147 'websockets) plus the vendor protocol — Emotiv Cortex, '
148 'Neurosity subscription topics, etc.'
149 )
152class LSLAdapter(_SDKUnavailable):
153 _install_hint = (
154 'LSL (Lab Streaming Layer) is the de-facto neuro transport '
155 'standard. Install `pylsl` (pip install pylsl) and resolve a '
156 'stream by type ("EEG", "PPG", ...) — works for every headset '
157 'that exposes LSL (Muse via muselsl, OpenBCI, Emotiv via '
158 'LSL-Apps).'
159 )
162class USBSerialAdapter(_SDKUnavailable):
163 _install_hint = (
164 'USB-serial transport requires `pyserial` (pip install '
165 'pyserial) plus the vendor framing — OpenBCI Cyton uses a '
166 '33-byte packet format. `brainflow` wraps this if you want '
167 'to skip the framing work.'
168 )
171class USBHIDAdapter(_SDKUnavailable):
172 _install_hint = (
173 'USB-HID transport requires `hidapi` (pip install hidapi). '
174 'Match by vendor_id/product_id; each vendor has a different '
175 'report descriptor.'
176 )
179# ─── Factory ──────────────────────────────────────────────────────────
181_ADAPTER_BY_TRANSPORT = {
182 Transport.BLE: BLEAdapter,
183 Transport.REST: RESTAdapter,
184 Transport.WEBSOCKET: WebSocketAdapter,
185 Transport.LSL: LSLAdapter,
186 Transport.USB_SERIAL: USBSerialAdapter,
187 Transport.USB_HID: USBHIDAdapter,
188 Transport.UNKNOWN: _SDKUnavailable,
189}
192def choose_adapter(provider: NeuroProvider) -> NeuroAdapter:
193 """Return the best adapter instance for `provider`.
195 Contact-gated providers always get ContactGatedAdapter regardless
196 of transport — the block is at the business layer, not the wire.
197 Other providers map by their primary (first) transport entry.
198 """
199 if provider.status == ProviderStatus.CONTACT_REQUIRED:
200 return ContactGatedAdapter(provider)
202 primary = provider.transport[0] if provider.transport else Transport.UNKNOWN
203 adapter_cls = _ADAPTER_BY_TRANSPORT.get(primary, _SDKUnavailable)
204 return adapter_cls(provider)
207def register_adapter(transport: Transport, adapter_cls) -> None:
208 """Override the adapter class for a transport. Call this from a
209 concrete implementation module once you've written a real adapter
210 (e.g. in integrations/providers/muse_adapter.py) so `choose_adapter`
211 picks it up automatically."""
212 if not issubclass(adapter_cls, NeuroAdapter):
213 raise TypeError('adapter_cls must subclass NeuroAdapter')
214 _ADAPTER_BY_TRANSPORT[transport] = adapter_cls