Coverage for integrations / channels / hive_signal_bridge.py: 76.3%

236 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Hive Signal Bridge -- Every channel message is a hive signal. 

3 

4Hooks into ALL channel adapters via on_message(). Classifies signals and 

5routes them to the appropriate hive agent: 

6 

7Signal Types: 

8 - COMPUTE_INTEREST: Someone mentions GPUs, idle hardware, mining, earning 

9 -> Route to compute_recruiter agent 

10 - MODEL_REQUEST: Someone asks about a model, wants inference 

11 -> Route to model_provisioner agent 

12 - BUG_REPORT: Someone reports a bug, error, issue 

13 -> Create HiveTask for connected Claude Code sessions 

14 - FEATURE_REQUEST: Someone wants a new feature 

15 -> Queue in instruction_queue for next idle agent 

16 - SUPPORT_NEEDED: Someone needs help 

17 -> Route to /chat pipeline for immediate response 

18 - SENTIMENT: General sentiment signal 

19 -> Feed to resonance tuner for community health tracking 

20 - RECRUITMENT_LEAD: Someone expresses interest in contributing 

21 -> Route to compute_recruiter with personalized onboarding 

22 - OPEN_SOURCE_SIGNAL: New model release, paper, benchmark mentioned 

23 -> Route to opensource_evangelist agent 

24 

25Every signal earns micro-Spark for the channel where it originated. 

26This incentivizes active communities. 

27""" 

28 

29import asyncio 

30import collections 

31import logging 

32import threading 

33import time 

34from typing import Any, Callable, Dict, List, Optional 

35 

36logger = logging.getLogger(__name__) 

37 

38 

39# ===================================================================== 

40# Signal Type Constants 

41# ===================================================================== 

42 

43COMPUTE_INTEREST = 'COMPUTE_INTEREST' 

44MODEL_REQUEST = 'MODEL_REQUEST' 

45BUG_REPORT = 'BUG_REPORT' 

46FEATURE_REQUEST = 'FEATURE_REQUEST' 

47SUPPORT_NEEDED = 'SUPPORT_NEEDED' 

48SENTIMENT = 'SENTIMENT' 

49RECRUITMENT_LEAD = 'RECRUITMENT_LEAD' 

50OPEN_SOURCE_SIGNAL = 'OPEN_SOURCE_SIGNAL' 

51 

52ALL_SIGNAL_TYPES = ( 

53 COMPUTE_INTEREST, MODEL_REQUEST, BUG_REPORT, FEATURE_REQUEST, 

54 SUPPORT_NEEDED, SENTIMENT, RECRUITMENT_LEAD, OPEN_SOURCE_SIGNAL, 

55) 

56 

57 

58# ===================================================================== 

59# Keyword Sets for Heuristic Classification 

60# ===================================================================== 

61 

62_COMPUTE_KEYWORDS = frozenset([ 

63 'gpu', 'cuda', 'vram', 'rtx', 'nvidia', 'amd', 'idle', 'mining', 

64 'earn', 'compute', 'hardware', 'server', 'rack', 'hosting', 

65]) 

66 

67_MODEL_KEYWORDS = frozenset([ 

68 'model', 'inference', 'llm', 'qwen', 'llama', 'mistral', 'gemma', 

69 'phi', 'gpt', 'claude', 'api', 'endpoint', 'chat', 'completion', 

70]) 

71 

72_BUG_KEYWORDS = frozenset([ 

73 'bug', 'error', 'crash', 'broken', 'fail', 'exception', 'traceback', 

74 'issue', '500', 'timeout', 'hang', 'freeze', 

75]) 

76 

77_FEATURE_KEYWORDS = frozenset([ 

78 'feature', 'request', 'wish', 'could you', 'would be nice', 

79 'suggestion', 'idea', 'propose', 'enhancement', 

80]) 

81 

82_SUPPORT_KEYWORDS = frozenset([ 

83 'help', 'stuck', 'how do i', 'how to', 'cant', "can't", 'unable', 

84 'confused', 'documentation', 'tutorial', 

85]) 

86 

87_RECRUIT_KEYWORDS = frozenset([ 

88 'contribute', 'join', 'volunteer', 'participate', 'help out', 

89 'open source', 'community', 'developer', 'contributor', 

90]) 

91 

92_OPENSOURCE_KEYWORDS = frozenset([ 

93 'huggingface', 'arxiv', 'paper', 'release', 'benchmark', 'gguf', 

94 'quantize', 'fine-tune', 'lora', 'unsloth', 'new model', 

95]) 

96 

97_POSITIVE_WORDS = frozenset([ 

98 'thanks', 'great', 'love', 'perfect', 'excellent', 'amazing', 

99 'good', 'nice', 'helpful', 'wonderful', 'appreciate', 'awesome', 

100]) 

101 

102_NEGATIVE_WORDS = frozenset([ 

103 'bad', 'wrong', 'terrible', 'hate', 'awful', 'worse', 'useless', 

104 'frustrated', 'confused', 'disappointed', 'annoying', 

105]) 

106 

107# Multi-word phrases extracted for substring matching (cannot be caught 

108# by single-word tokenization) 

109_MULTI_WORD_PHRASES: Dict[str, List[str]] = { 

110 FEATURE_REQUEST: ['could you', 'would be nice'], 

111 SUPPORT_NEEDED: ['how do i', 'how to'], 

112 RECRUITMENT_LEAD: ['help out', 'open source'], 

113 OPEN_SOURCE_SIGNAL: ['fine-tune', 'new model'], 

114} 

115 

116# Single-word lookup: signal_type -> keywords (excludes multi-word phrases) 

117_SINGLE_WORD_MAP: Dict[str, frozenset] = { 

118 COMPUTE_INTEREST: _COMPUTE_KEYWORDS, 

119 MODEL_REQUEST: _MODEL_KEYWORDS, 

120 BUG_REPORT: _BUG_KEYWORDS, 

121 FEATURE_REQUEST: frozenset(k for k in _FEATURE_KEYWORDS if ' ' not in k), 

122 SUPPORT_NEEDED: frozenset(k for k in _SUPPORT_KEYWORDS if ' ' not in k), 

123 RECRUITMENT_LEAD: frozenset(k for k in _RECRUIT_KEYWORDS if ' ' not in k), 

124 OPEN_SOURCE_SIGNAL: frozenset(k for k in _OPENSOURCE_KEYWORDS if ' ' not in k), 

125} 

126 

127 

128# ===================================================================== 

129# Signal Feed Entry 

130# ===================================================================== 

131 

132def _make_feed_entry(message, signals: List[str]) -> dict: 

133 """Create a lightweight feed entry from a message and its signals.""" 

134 content = '' 

135 try: 

136 content = message.content if hasattr(message, 'content') else '' 

137 if callable(content): 

138 content = content() 

139 except Exception: 

140 content = getattr(message, 'text', '') or '' 

141 

142 return { 

143 'message_id': getattr(message, 'id', ''), 

144 'channel': getattr(message, 'channel', ''), 

145 'sender_id': getattr(message, 'sender_id', ''), 

146 'sender_name': getattr(message, 'sender_name', ''), 

147 'text_preview': (content[:120] + '...') if len(content) > 120 else content, 

148 'signals': list(signals), 

149 'is_group': getattr(message, 'is_group', False), 

150 'timestamp': time.time(), 

151 } 

152 

153 

154# ===================================================================== 

155# HiveSignalBridge 

156# ===================================================================== 

157 

158class HiveSignalBridge: 

159 """Captures signals from all channel adapters and feeds them into the hive. 

160 

161 Every message across every channel is a signal -- for recruitment, 

162 support, demand detection, sentiment, and hive growth. 

163 

164 The _on_message handler is designed to be fast (<5ms): no LLM calls, 

165 no database writes, no network I/O. All heavy work (goal dispatch, 

166 instruction queuing, etc.) is offloaded to background threads. 

167 """ 

168 

169 def __init__(self): 

170 self._lock = threading.Lock() 

171 

172 # Stats: signal counts by type and by channel 

173 self._signal_counts: Dict[str, int] = {st: 0 for st in ALL_SIGNAL_TYPES} 

174 self._channel_counts: Dict[str, int] = {} 

175 self._total_messages: int = 0 

176 

177 # Bounded signal feed for dashboard display 

178 self._signal_feed: collections.deque = collections.deque(maxlen=1000) 

179 

180 # Background executor for routing (keeps _on_message fast) 

181 self._executor: Optional[Any] = None 

182 self._executor_lock = threading.Lock() 

183 

184 # Attached adapter names (for diagnostics) 

185 self._attached_adapters: List[str] = [] 

186 

187 # ── Lazy background executor ─────────────────────────────────── 

188 

189 def _get_executor(self): 

190 """Lazily create the thread pool executor for routing.""" 

191 if self._executor is None: 

192 with self._executor_lock: 

193 if self._executor is None: 

194 from concurrent.futures import ThreadPoolExecutor 

195 self._executor = ThreadPoolExecutor( 

196 max_workers=2, 

197 thread_name_prefix='hive_signal', 

198 ) 

199 return self._executor 

200 

201 # ── Adapter Attachment ───────────────────────────────────────── 

202 

203 def attach_to_adapter(self, adapter) -> None: 

204 """Register self as a message handler on a channel adapter. 

205 

206 Call this for every ChannelAdapter at boot time. 

207 

208 Args: 

209 adapter: A ChannelAdapter instance (from integrations.channels.base). 

210 """ 

211 try: 

212 adapter.on_message(self._on_message) 

213 name = getattr(adapter, 'name', str(type(adapter).__name__)) 

214 self._attached_adapters.append(name) 

215 logger.info("HiveSignalBridge attached to channel: %s", name) 

216 except Exception as e: 

217 logger.warning("Failed to attach HiveSignalBridge to adapter: %s", e) 

218 

219 def attach_to_all(self, adapters: dict) -> None: 

220 """Attach to all adapters in a dict (e.g., ChannelRegistry._adapters). 

221 

222 Args: 

223 adapters: Dict mapping channel name -> ChannelAdapter instance. 

224 """ 

225 for name, adapter in adapters.items(): 

226 self.attach_to_adapter(adapter) 

227 

228 # ── Core Message Handler ─────────────────────────────────────── 

229 

230 def _on_message(self, message) -> None: 

231 """Core handler. Called for every message on every channel. 

232 

233 MUST be fast (<5ms). No LLM calls, no DB writes, no network I/O. 

234 Classification is pure heuristic. Routing is offloaded to background. 

235 """ 

236 try: 

237 # Extract text content (Message.content is a property in base.py) 

238 text = '' 

239 try: 

240 text = message.content if hasattr(message, 'content') else '' 

241 if callable(text): 

242 text = text() 

243 except Exception: 

244 text = getattr(message, 'text', '') or '' 

245 

246 if not text or len(text.strip()) < 2: 

247 return 

248 

249 channel_type = getattr(message, 'channel', '') 

250 is_group = getattr(message, 'is_group', False) 

251 

252 # Fast heuristic classification (no LLM) 

253 signals = self.classify_signal(text, channel_type, is_group) 

254 

255 if not signals: 

256 return 

257 

258 # Update stats (thread-safe) 

259 with self._lock: 

260 self._total_messages += 1 

261 for sig in signals: 

262 self._signal_counts[sig] = self._signal_counts.get(sig, 0) + 1 

263 self._channel_counts[channel_type] = ( 

264 self._channel_counts.get(channel_type, 0) + 1 

265 ) 

266 

267 # Add to feed 

268 entry = _make_feed_entry(message, signals) 

269 self._signal_feed.append(entry) 

270 

271 # Emit to EventBus (best-effort, non-blocking) 

272 self._emit_signal_event(message, signals, channel_type) 

273 

274 # Emit micro-Spark reward event for the originating channel 

275 self._emit_spark_event(message, signals, channel_type) 

276 

277 # Route to appropriate hive agents (background thread) 

278 self._get_executor().submit( 

279 self._route_signals, message, signals 

280 ) 

281 

282 except Exception as e: 

283 # Handler must never raise -- would break the channel adapter 

284 logger.debug("HiveSignalBridge._on_message error: %s", e) 

285 

286 # ── Signal Classification ────────────────────────────────────── 

287 

288 def classify_signal(self, text: str, channel_type: str = '', 

289 is_group: bool = False) -> List[str]: 

290 """Fast heuristic classifier. No LLM calls -- pure keyword matching. 

291 

292 A single message can match multiple signal types. 

293 

294 Args: 

295 text: Message text content. 

296 channel_type: Channel name (e.g., 'discord', 'telegram'). 

297 is_group: Whether the message is from a group chat. 

298 

299 Returns: 

300 List of matched signal type strings. 

301 """ 

302 signals: List[str] = [] 

303 text_lower = text.lower() 

304 words = set(text_lower.split()) 

305 

306 # Single-word keyword matching 

307 for signal_type, keywords in _SINGLE_WORD_MAP.items(): 

308 if words & keywords: 

309 signals.append(signal_type) 

310 

311 # Multi-word phrase matching (substring search) 

312 for signal_type, phrases in _MULTI_WORD_PHRASES.items(): 

313 if signal_type not in signals: 

314 for phrase in phrases: 

315 if phrase in text_lower: 

316 signals.append(signal_type) 

317 break 

318 

319 # URL-based signals 

320 if 'huggingface.co' in text_lower or 'arxiv.org' in text_lower: 

321 if OPEN_SOURCE_SIGNAL not in signals: 

322 signals.append(OPEN_SOURCE_SIGNAL) 

323 

324 # Sentiment detection (always check -- provides community health data) 

325 pos_count = len(words & _POSITIVE_WORDS) 

326 neg_count = len(words & _NEGATIVE_WORDS) 

327 if pos_count > 0 or neg_count > 0: 

328 if SENTIMENT not in signals: 

329 signals.append(SENTIMENT) 

330 

331 return signals 

332 

333 # ── Event Emission ───────────────────────────────────────────── 

334 

335 def _emit_signal_event(self, message, signals: List[str], 

336 channel_type: str) -> None: 

337 """Emit 'hive.signal.received' to EventBus.""" 

338 try: 

339 from core.platform.events import emit_event 

340 emit_event('hive.signal.received', { 

341 'message_id': getattr(message, 'id', ''), 

342 'channel': channel_type, 

343 'sender_id': getattr(message, 'sender_id', ''), 

344 'signals': signals, 

345 'is_group': getattr(message, 'is_group', False), 

346 'timestamp': time.time(), 

347 }) 

348 except Exception: 

349 pass # EventBus emission is best-effort 

350 

351 def _emit_spark_event(self, message, signals: List[str], 

352 channel_type: str) -> None: 

353 """Emit 'hive.signal.spark' for micro-Spark channel reward. 

354 

355 Actual Spark accounting happens elsewhere (revenue_aggregator). 

356 This event simply declares that a signal-worthy message occurred 

357 on a given channel, so the reward system can credit it. 

358 """ 

359 try: 

360 from core.platform.events import emit_event 

361 emit_event('hive.signal.spark', { 

362 'channel': channel_type, 

363 'sender_id': getattr(message, 'sender_id', ''), 

364 'signal_count': len(signals), 

365 'signals': signals, 

366 'timestamp': time.time(), 

367 }) 

368 except Exception: 

369 pass 

370 

371 # ── Signal Routing ───────────────────────────────────────────── 

372 

373 def _route_signals(self, message, signals: List[str]) -> None: 

374 """Route classified signals to the appropriate hive subsystem. 

375 

376 Runs in a background thread so the message handler stays fast. 

377 Each router method is best-effort -- failures are logged, never raised. 

378 """ 

379 router_map = { 

380 COMPUTE_INTEREST: self._route_compute_interest, 

381 MODEL_REQUEST: self._route_model_request, 

382 BUG_REPORT: self._route_bug_report, 

383 FEATURE_REQUEST: self._route_feature_request, 

384 SUPPORT_NEEDED: self._route_support_needed, 

385 RECRUITMENT_LEAD: self._route_recruitment_lead, 

386 OPEN_SOURCE_SIGNAL: self._route_open_source_signal, 

387 SENTIMENT: self._route_sentiment, 

388 } 

389 for signal_type in signals: 

390 handler = router_map.get(signal_type) 

391 if handler: 

392 try: 

393 handler(message, signals) 

394 except Exception as e: 

395 logger.debug("Signal routing error (%s): %s", signal_type, e) 

396 

397 def _route_compute_interest(self, message, signals: List[str]) -> None: 

398 """Queue recruitment task for compute_recruiter agent. 

399 

400 Dispatches a goal that triggers the bootstrap_compute_recruiter 

401 seed (from goal_seeding.py) to reach out with personalized 

402 onboarding content. 

403 """ 

404 try: 

405 from integrations.agent_engine.dispatch import dispatch_goal 

406 text = _extract_text(message) 

407 sender = getattr(message, 'sender_name', '') or getattr(message, 'sender_id', 'unknown') 

408 channel = getattr(message, 'channel', 'unknown') 

409 dispatch_goal( 

410 prompt=( 

411 f"Compute recruitment lead detected on {channel} from {sender}: " 

412 f"\"{text[:300]}\". " 

413 "Evaluate interest level and prepare personalized onboarding message. " 

414 "Explain how to contribute idle compute to the hive and earn Spark." 

415 ), 

416 user_id='hive_signal_bridge', 

417 goal_id=f"sig_compute_{getattr(message, 'id', '')}", 

418 goal_type='hive_growth', 

419 ) 

420 logger.debug("Routed COMPUTE_INTEREST signal from %s", channel) 

421 except ImportError: 

422 logger.debug("dispatch module not available for compute routing") 

423 except Exception as e: 

424 logger.debug("COMPUTE_INTEREST routing failed: %s", e) 

425 

426 def _route_model_request(self, message, signals: List[str]) -> None: 

427 """Check if requested model is available; if not, queue provisioning. 

428 

429 Routes to model_provisioner goal to check hive model registry 

430 and potentially onboard the requested model. 

431 """ 

432 try: 

433 from integrations.agent_engine.dispatch import dispatch_goal 

434 text = _extract_text(message) 

435 channel = getattr(message, 'channel', 'unknown') 

436 dispatch_goal( 

437 prompt=( 

438 f"Model request detected on {channel}: \"{text[:300]}\". " 

439 "Check if the requested model is available in the hive model registry. " 

440 "If not, evaluate feasibility and queue for onboarding." 

441 ), 

442 user_id='hive_signal_bridge', 

443 goal_id=f"sig_model_{getattr(message, 'id', '')}", 

444 goal_type='hive_growth', 

445 ) 

446 logger.debug("Routed MODEL_REQUEST signal from %s", channel) 

447 except ImportError: 

448 logger.debug("dispatch module not available for model routing") 

449 except Exception as e: 

450 logger.debug("MODEL_REQUEST routing failed: %s", e) 

451 

452 def _route_bug_report(self, message, signals: List[str]) -> None: 

453 """Create HiveTask (BUG_FIX type) for connected Claude Code sessions. 

454 

455 If the hive_task_protocol is available, create a task. Otherwise 

456 falls back to instruction_queue for the next idle agent. 

457 """ 

458 text = _extract_text(message) 

459 channel = getattr(message, 'channel', 'unknown') 

460 sender = getattr(message, 'sender_name', '') or getattr(message, 'sender_id', 'unknown') 

461 

462 # Try HiveTaskDispatcher first (for live Claude Code sessions) 

463 try: 

464 from integrations.coding_agent.hive_task_protocol import get_dispatcher 

465 get_dispatcher().create_task( 

466 task_type='bug_fix', 

467 title=f"Bug report from {channel}", 

468 description=f"Bug report from {sender} on {channel}: {text[:500]}", 

469 instructions=f"Investigate and fix: {text[:1000]}", 

470 ) 

471 logger.debug("Routed BUG_REPORT to HiveTaskDispatcher from %s", channel) 

472 return 

473 except (ImportError, AttributeError): 

474 pass 

475 except Exception as e: 

476 logger.debug("HiveTask dispatch failed, falling back: %s", e) 

477 

478 # Fallback: queue in instruction_queue 

479 try: 

480 from integrations.agent_engine.instruction_queue import enqueue_instruction 

481 enqueue_instruction( 

482 user_id='hive_signal_bridge', 

483 text=( 

484 f"Bug report from {sender} on {channel}: {text[:500]}. " 

485 "Investigate and fix if possible." 

486 ), 

487 priority=7, 

488 tags=['bug', 'signal_bridge', channel], 

489 ) 

490 logger.debug("Routed BUG_REPORT to instruction_queue from %s", channel) 

491 except ImportError: 

492 logger.debug("instruction_queue not available for bug routing") 

493 except Exception as e: 

494 logger.debug("BUG_REPORT instruction queue failed: %s", e) 

495 

496 def _route_feature_request(self, message, signals: List[str]) -> None: 

497 """Queue feature request in instruction_queue for next idle agent.""" 

498 try: 

499 from integrations.agent_engine.instruction_queue import enqueue_instruction 

500 text = _extract_text(message) 

501 channel = getattr(message, 'channel', 'unknown') 

502 sender = getattr(message, 'sender_name', '') or getattr(message, 'sender_id', 'unknown') 

503 enqueue_instruction( 

504 user_id='hive_signal_bridge', 

505 text=( 

506 f"Feature request from {sender} on {channel}: {text[:500]}. " 

507 "Evaluate feasibility and create implementation plan if viable." 

508 ), 

509 priority=4, 

510 tags=['feature_request', 'signal_bridge', channel], 

511 ) 

512 logger.debug("Routed FEATURE_REQUEST to instruction_queue from %s", channel) 

513 except ImportError: 

514 logger.debug("instruction_queue not available for feature routing") 

515 except Exception as e: 

516 logger.debug("FEATURE_REQUEST routing failed: %s", e) 

517 

518 def _route_support_needed(self, message, signals: List[str]) -> None: 

519 """Dispatch to /chat for immediate response. 

520 

521 Uses dispatch_goal with high priority so the user gets help fast. 

522 """ 

523 try: 

524 from integrations.agent_engine.dispatch import dispatch_goal 

525 text = _extract_text(message) 

526 channel = getattr(message, 'channel', 'unknown') 

527 sender = getattr(message, 'sender_name', '') or getattr(message, 'sender_id', 'unknown') 

528 dispatch_goal( 

529 prompt=( 

530 f"Support request from {sender} on {channel}: \"{text[:300]}\". " 

531 "Provide helpful, clear guidance. Be patient and thorough." 

532 ), 

533 user_id=getattr(message, 'sender_id', 'hive_signal_bridge'), 

534 goal_id=f"sig_support_{getattr(message, 'id', '')}", 

535 goal_type='support', 

536 ) 

537 logger.debug("Routed SUPPORT_NEEDED to /chat from %s", channel) 

538 except ImportError: 

539 logger.debug("dispatch module not available for support routing") 

540 except Exception as e: 

541 logger.debug("SUPPORT_NEEDED routing failed: %s", e) 

542 

543 def _route_recruitment_lead(self, message, signals: List[str]) -> None: 

544 """High-priority: queue personalized onboarding message. 

545 

546 Someone expressed interest in contributing -- this is the most 

547 valuable signal. Route to compute_recruiter with high priority. 

548 """ 

549 try: 

550 from integrations.agent_engine.dispatch import dispatch_goal 

551 text = _extract_text(message) 

552 sender = getattr(message, 'sender_name', '') or getattr(message, 'sender_id', 'unknown') 

553 channel = getattr(message, 'channel', 'unknown') 

554 dispatch_goal( 

555 prompt=( 

556 f"HIGH PRIORITY recruitment lead on {channel} from {sender}: " 

557 f"\"{text[:300]}\". " 

558 "This person wants to contribute to the hive. " 

559 "Prepare a warm, personalized onboarding message. " 

560 "Explain the mission, how to get started, and the Spark rewards. " 

561 "Make them feel welcome and valued." 

562 ), 

563 user_id='hive_signal_bridge', 

564 goal_id=f"sig_recruit_{getattr(message, 'id', '')}", 

565 goal_type='hive_growth', 

566 ) 

567 logger.debug("Routed RECRUITMENT_LEAD from %s", channel) 

568 except ImportError: 

569 logger.debug("dispatch module not available for recruitment routing") 

570 except Exception as e: 

571 logger.debug("RECRUITMENT_LEAD routing failed: %s", e) 

572 

573 def _route_open_source_signal(self, message, signals: List[str]) -> None: 

574 """Queue model onboarding task for opensource_evangelist agent.""" 

575 try: 

576 from integrations.agent_engine.dispatch import dispatch_goal 

577 text = _extract_text(message) 

578 channel = getattr(message, 'channel', 'unknown') 

579 dispatch_goal( 

580 prompt=( 

581 f"Open source signal detected on {channel}: \"{text[:300]}\". " 

582 "Evaluate if this is a new model, paper, or benchmark worth " 

583 "onboarding to the hive. If so, create an integration plan." 

584 ), 

585 user_id='hive_signal_bridge', 

586 goal_id=f"sig_oss_{getattr(message, 'id', '')}", 

587 goal_type='hive_growth', 

588 ) 

589 logger.debug("Routed OPEN_SOURCE_SIGNAL from %s", channel) 

590 except ImportError: 

591 logger.debug("dispatch module not available for OSS routing") 

592 except Exception as e: 

593 logger.debug("OPEN_SOURCE_SIGNAL routing failed: %s", e) 

594 

595 def _route_sentiment(self, message, signals: List[str]) -> None: 

596 """Feed sentiment signal to resonance tuner for community health. 

597 

598 Uses SignalExtractor to compute sentiment and feeds it as a 

599 lightweight community-level signal. No per-user profile update 

600 (that happens through the normal /chat path). 

601 """ 

602 try: 

603 from core.resonance_tuner import SignalExtractor 

604 text = _extract_text(message) 

605 extracted = SignalExtractor.extract(text, '', 0.0) 

606 channel = getattr(message, 'channel', 'unknown') 

607 

608 try: 

609 from core.platform.events import emit_event 

610 emit_event('hive.signal.sentiment', { 

611 'channel': channel, 

612 'sender_id': getattr(message, 'sender_id', ''), 

613 'positive_sentiment': extracted.positive_sentiment, 

614 'formality': extracted.formality_markers, 

615 'is_group': getattr(message, 'is_group', False), 

616 'timestamp': time.time(), 

617 }) 

618 except Exception: 

619 pass 

620 except ImportError: 

621 pass 

622 except Exception as e: 

623 logger.debug("SENTIMENT routing failed: %s", e) 

624 

625 # ── Stats & Feed ─────────────────────────────────────────────── 

626 

627 def get_stats(self) -> dict: 

628 """Signal counts by type, by channel, and total messages processed. 

629 

630 Returns: 

631 Dict with 'by_type', 'by_channel', 'total_messages', 

632 and 'attached_adapters'. 

633 """ 

634 with self._lock: 

635 return { 

636 'by_type': dict(self._signal_counts), 

637 'by_channel': dict(self._channel_counts), 

638 'total_messages': self._total_messages, 

639 'attached_adapters': list(self._attached_adapters), 

640 } 

641 

642 def get_signal_feed(self, limit: int = 50) -> List[dict]: 

643 """Recent signals for dashboard display. 

644 

645 Args: 

646 limit: Maximum number of entries to return (default 50). 

647 

648 Returns: 

649 List of signal feed entries, most recent first. 

650 """ 

651 limit = max(1, min(limit, 1000)) 

652 entries = list(self._signal_feed) 

653 # Return most recent first 

654 return list(reversed(entries[-limit:])) 

655 

656 

657# ===================================================================== 

658# Helpers 

659# ===================================================================== 

660 

661def _extract_text(message) -> str: 

662 """Safely extract text content from a Message object.""" 

663 try: 

664 content = message.content if hasattr(message, 'content') else '' 

665 if callable(content): 

666 content = content() 

667 return content or getattr(message, 'text', '') or '' 

668 except Exception: 

669 return getattr(message, 'text', '') or '' 

670 

671 

672# ===================================================================== 

673# Singleton 

674# ===================================================================== 

675 

676_bridge: Optional[HiveSignalBridge] = None 

677_bridge_lock = threading.Lock() 

678 

679 

680def get_signal_bridge() -> HiveSignalBridge: 

681 """Get or create the singleton HiveSignalBridge.""" 

682 global _bridge 

683 if _bridge is None: 

684 with _bridge_lock: 

685 if _bridge is None: 

686 _bridge = HiveSignalBridge() 

687 return _bridge 

688 

689 

690# ===================================================================== 

691# Flask Blueprint (lazy) 

692# ===================================================================== 

693 

694def create_signal_blueprint(): 

695 """Create a Flask Blueprint for signal bridge API endpoints. 

696 

697 Endpoints: 

698 GET /api/hive/signals/stats - Signal counts by type and channel 

699 GET /api/hive/signals/feed - Recent signals (limit query param) 

700 POST /api/hive/signals/classify - Test classifier on arbitrary text 

701 

702 Returns: 

703 Flask Blueprint instance. 

704 """ 

705 try: 

706 from flask import Blueprint, jsonify, request 

707 except ImportError: 

708 logger.debug("Flask not available -- signal blueprint not created") 

709 return None 

710 

711 bp = Blueprint('hive_signals', __name__, url_prefix='/api/hive/signals') 

712 

713 @bp.route('/stats', methods=['GET']) 

714 def signal_stats(): 

715 bridge = get_signal_bridge() 

716 return jsonify(bridge.get_stats()) 

717 

718 @bp.route('/feed', methods=['GET']) 

719 def signal_feed(): 

720 bridge = get_signal_bridge() 

721 limit = request.args.get('limit', 50, type=int) 

722 return jsonify(bridge.get_signal_feed(limit=limit)) 

723 

724 @bp.route('/classify', methods=['POST']) 

725 def classify_text(): 

726 bridge = get_signal_bridge() 

727 data = request.get_json(silent=True) or {} 

728 text = data.get('text', '') 

729 channel_type = data.get('channel_type', '') 

730 is_group = data.get('is_group', False) 

731 if not text: 

732 return jsonify({'error': 'text is required'}), 400 

733 signals = bridge.classify_signal(text, channel_type, is_group) 

734 return jsonify({ 

735 'text': text, 

736 'signals': signals, 

737 'channel_type': channel_type, 

738 'is_group': is_group, 

739 }) 

740 

741 return bp