Coverage for integrations / providers / neuro_adapter.py: 95.4%

65 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""Adapter protocol for neuro / biometric providers. 

2 

3One abstract base (`NeuroAdapter`), one factory (`choose_adapter`) that 

4routes by the provider's primary transport, concrete adapter stubs per 

5transport that raise a clear "install <sdk> and implement X" error when 

6the vendor SDK isn't present. 

7 

8Why stubs instead of full implementations: 

9 

10 - Each vendor's wire protocol is different; shipping concrete clients 

11 for all of them would pull in ~10 optional dependency chains. 

12 - The stub error message includes the exact pip package name and docs 

13 URL so a developer can go from "I want to talk to this device" to 

14 working code in one sitting. 

15 - Contact-gated providers (Yneuro) get a separate ContactGatedAdapter 

16 that raises PermissionError with the vendor's contact email — clear 

17 that the block is at the business layer, not the wire. 

18 

19Cross-network sensor reads reuse PeerLink's existing `sensor` channel 

20(core/peer_link/channels.py) — E2E-encrypted. Zero new wire protocol. 

21""" 

22 

23from __future__ import annotations 

24 

25import logging 

26from abc import ABC, abstractmethod 

27from dataclasses import dataclass 

28from typing import Any, Dict, Optional 

29 

30from integrations.providers.neuro_providers import ( 

31 NeuroProvider, 

32 ProviderStatus, 

33 SignalType, 

34 Transport, 

35) 

36 

37logger = logging.getLogger(__name__) 

38 

39 

40# ─── Reading shape ──────────────────────────────────────────────────── 

41 

42@dataclass 

43class Reading: 

44 """One chunk of sensor output from a connected provider.""" 

45 provider_id: str 

46 signal: SignalType 

47 data: Any # numpy array / dict / list — transport-dependent 

48 sample_rate_hz: int 

49 timestamp_start: float 

50 duration_s: float 

51 metadata: Dict[str, Any] = None # channel names, units, etc. 

52 

53 

54# ─── Abstract base ──────────────────────────────────────────────────── 

55 

56class NeuroAdapter(ABC): 

57 """Every concrete adapter implements these three methods.""" 

58 

59 def __init__(self, provider: NeuroProvider): 

60 self.provider = provider 

61 self._connected = False 

62 

63 @abstractmethod 

64 def connect(self, **kwargs) -> bool: 

65 """Open the underlying transport. Returns True on success. 

66 May raise NotImplementedError (SDK missing) or PermissionError 

67 (contact-gated provider).""" 

68 

69 @abstractmethod 

70 def read_signal( 

71 self, signal: SignalType, duration_s: float = 1.0, 

72 ) -> Reading: 

73 """Read `duration_s` seconds of the requested signal.""" 

74 

75 @abstractmethod 

76 def disconnect(self) -> None: 

77 """Release the transport cleanly.""" 

78 

79 def is_connected(self) -> bool: 

80 return self._connected 

81 

82 

83# ─── Concrete / stub adapters ───────────────────────────────────────── 

84 

85class _SDKUnavailable(NeuroAdapter): 

86 """Base stub — every transport-specific stub overrides _install_hint. 

87 Raises NotImplementedError on connect so the dev knows exactly 

88 which pip package to install and where the vendor docs live.""" 

89 

90 _install_hint = 'No adapter registered for this provider/transport.' 

91 

92 def connect(self, **kwargs) -> bool: 

93 raise NotImplementedError( 

94 f'{self.provider.name}: {self._install_hint} ' 

95 f'Docs: {self.provider.docs_url or self.provider.website}. ' 

96 f'Implement a NeuroAdapter subclass in ' 

97 f'integrations/providers/neuro_adapter.py and register it ' 

98 f'in _ADAPTER_BY_TRANSPORT.' 

99 ) 

100 

101 def read_signal(self, signal, duration_s=1.0): 

102 raise NotImplementedError('connect() first') 

103 

104 def disconnect(self) -> None: 

105 self._connected = False 

106 

107 

108class ContactGatedAdapter(NeuroAdapter): 

109 """For providers whose SDK/API is not public (e.g. Yneuro). 

110 Raises PermissionError with the vendor's contact email so the 

111 developer reaches out instead of assuming an integration exists.""" 

112 

113 def connect(self, **kwargs) -> bool: 

114 contact = self.provider.contact_email or self.provider.website 

115 raise PermissionError( 

116 f'{self.provider.name} does not publish a public SDK ' 

117 f'or API. Contact {contact} for developer access. ' 

118 f'Once you receive credentials, swap this adapter for a ' 

119 f'concrete one — registry entry is already wired.' 

120 ) 

121 

122 def read_signal(self, signal, duration_s=1.0): 

123 raise PermissionError('provider contact-gated — connect() first') 

124 

125 def disconnect(self) -> None: 

126 self._connected = False 

127 

128 

129class BLEAdapter(_SDKUnavailable): 

130 _install_hint = ( 

131 'BLE transport requires `bleak` (pip install bleak) plus the ' 

132 'vendor SDK — `muselsl` for Muse, `brainflow` for OpenBCI.' 

133 ) 

134 

135 

136class RESTAdapter(_SDKUnavailable): 

137 _install_hint = ( 

138 'REST transport needs the vendor OAuth flow. Implement a ' 

139 'NeuroAdapter subclass that calls requests.get() against the ' 

140 'documented base URL and caches the access token.' 

141 ) 

142 

143 

144class WebSocketAdapter(_SDKUnavailable): 

145 _install_hint = ( 

146 'WebSocket transport requires `websockets` (pip install ' 

147 'websockets) plus the vendor protocol — Emotiv Cortex, ' 

148 'Neurosity subscription topics, etc.' 

149 ) 

150 

151 

152class LSLAdapter(_SDKUnavailable): 

153 _install_hint = ( 

154 'LSL (Lab Streaming Layer) is the de-facto neuro transport ' 

155 'standard. Install `pylsl` (pip install pylsl) and resolve a ' 

156 'stream by type ("EEG", "PPG", ...) — works for every headset ' 

157 'that exposes LSL (Muse via muselsl, OpenBCI, Emotiv via ' 

158 'LSL-Apps).' 

159 ) 

160 

161 

162class USBSerialAdapter(_SDKUnavailable): 

163 _install_hint = ( 

164 'USB-serial transport requires `pyserial` (pip install ' 

165 'pyserial) plus the vendor framing — OpenBCI Cyton uses a ' 

166 '33-byte packet format. `brainflow` wraps this if you want ' 

167 'to skip the framing work.' 

168 ) 

169 

170 

171class USBHIDAdapter(_SDKUnavailable): 

172 _install_hint = ( 

173 'USB-HID transport requires `hidapi` (pip install hidapi). ' 

174 'Match by vendor_id/product_id; each vendor has a different ' 

175 'report descriptor.' 

176 ) 

177 

178 

179# ─── Factory ────────────────────────────────────────────────────────── 

180 

181_ADAPTER_BY_TRANSPORT = { 

182 Transport.BLE: BLEAdapter, 

183 Transport.REST: RESTAdapter, 

184 Transport.WEBSOCKET: WebSocketAdapter, 

185 Transport.LSL: LSLAdapter, 

186 Transport.USB_SERIAL: USBSerialAdapter, 

187 Transport.USB_HID: USBHIDAdapter, 

188 Transport.UNKNOWN: _SDKUnavailable, 

189} 

190 

191 

192def choose_adapter(provider: NeuroProvider) -> NeuroAdapter: 

193 """Return the best adapter instance for `provider`. 

194 

195 Contact-gated providers always get ContactGatedAdapter regardless 

196 of transport — the block is at the business layer, not the wire. 

197 Other providers map by their primary (first) transport entry. 

198 """ 

199 if provider.status == ProviderStatus.CONTACT_REQUIRED: 

200 return ContactGatedAdapter(provider) 

201 

202 primary = provider.transport[0] if provider.transport else Transport.UNKNOWN 

203 adapter_cls = _ADAPTER_BY_TRANSPORT.get(primary, _SDKUnavailable) 

204 return adapter_cls(provider) 

205 

206 

207def register_adapter(transport: Transport, adapter_cls) -> None: 

208 """Override the adapter class for a transport. Call this from a 

209 concrete implementation module once you've written a real adapter 

210 (e.g. in integrations/providers/muse_adapter.py) so `choose_adapter` 

211 picks it up automatically.""" 

212 if not issubclass(adapter_cls, NeuroAdapter): 

213 raise TypeError('adapter_cls must subclass NeuroAdapter') 

214 _ADAPTER_BY_TRANSPORT[transport] = adapter_cls