Coverage for integrations / channels / hive_signal_bridge.py: 76.3%
236 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Hive Signal Bridge -- Every channel message is a hive signal.
4Hooks into ALL channel adapters via on_message(). Classifies signals and
5routes them to the appropriate hive agent:
7Signal Types:
8 - COMPUTE_INTEREST: Someone mentions GPUs, idle hardware, mining, earning
9 -> Route to compute_recruiter agent
10 - MODEL_REQUEST: Someone asks about a model, wants inference
11 -> Route to model_provisioner agent
12 - BUG_REPORT: Someone reports a bug, error, issue
13 -> Create HiveTask for connected Claude Code sessions
14 - FEATURE_REQUEST: Someone wants a new feature
15 -> Queue in instruction_queue for next idle agent
16 - SUPPORT_NEEDED: Someone needs help
17 -> Route to /chat pipeline for immediate response
18 - SENTIMENT: General sentiment signal
19 -> Feed to resonance tuner for community health tracking
20 - RECRUITMENT_LEAD: Someone expresses interest in contributing
21 -> Route to compute_recruiter with personalized onboarding
22 - OPEN_SOURCE_SIGNAL: New model release, paper, benchmark mentioned
23 -> Route to opensource_evangelist agent
25Every signal earns micro-Spark for the channel where it originated.
26This incentivizes active communities.
27"""
29import asyncio
30import collections
31import logging
32import threading
33import time
34from typing import Any, Callable, Dict, List, Optional
36logger = logging.getLogger(__name__)
39# =====================================================================
40# Signal Type Constants
41# =====================================================================
43COMPUTE_INTEREST = 'COMPUTE_INTEREST'
44MODEL_REQUEST = 'MODEL_REQUEST'
45BUG_REPORT = 'BUG_REPORT'
46FEATURE_REQUEST = 'FEATURE_REQUEST'
47SUPPORT_NEEDED = 'SUPPORT_NEEDED'
48SENTIMENT = 'SENTIMENT'
49RECRUITMENT_LEAD = 'RECRUITMENT_LEAD'
50OPEN_SOURCE_SIGNAL = 'OPEN_SOURCE_SIGNAL'
52ALL_SIGNAL_TYPES = (
53 COMPUTE_INTEREST, MODEL_REQUEST, BUG_REPORT, FEATURE_REQUEST,
54 SUPPORT_NEEDED, SENTIMENT, RECRUITMENT_LEAD, OPEN_SOURCE_SIGNAL,
55)
58# =====================================================================
59# Keyword Sets for Heuristic Classification
60# =====================================================================
62_COMPUTE_KEYWORDS = frozenset([
63 'gpu', 'cuda', 'vram', 'rtx', 'nvidia', 'amd', 'idle', 'mining',
64 'earn', 'compute', 'hardware', 'server', 'rack', 'hosting',
65])
67_MODEL_KEYWORDS = frozenset([
68 'model', 'inference', 'llm', 'qwen', 'llama', 'mistral', 'gemma',
69 'phi', 'gpt', 'claude', 'api', 'endpoint', 'chat', 'completion',
70])
72_BUG_KEYWORDS = frozenset([
73 'bug', 'error', 'crash', 'broken', 'fail', 'exception', 'traceback',
74 'issue', '500', 'timeout', 'hang', 'freeze',
75])
77_FEATURE_KEYWORDS = frozenset([
78 'feature', 'request', 'wish', 'could you', 'would be nice',
79 'suggestion', 'idea', 'propose', 'enhancement',
80])
82_SUPPORT_KEYWORDS = frozenset([
83 'help', 'stuck', 'how do i', 'how to', 'cant', "can't", 'unable',
84 'confused', 'documentation', 'tutorial',
85])
87_RECRUIT_KEYWORDS = frozenset([
88 'contribute', 'join', 'volunteer', 'participate', 'help out',
89 'open source', 'community', 'developer', 'contributor',
90])
92_OPENSOURCE_KEYWORDS = frozenset([
93 'huggingface', 'arxiv', 'paper', 'release', 'benchmark', 'gguf',
94 'quantize', 'fine-tune', 'lora', 'unsloth', 'new model',
95])
97_POSITIVE_WORDS = frozenset([
98 'thanks', 'great', 'love', 'perfect', 'excellent', 'amazing',
99 'good', 'nice', 'helpful', 'wonderful', 'appreciate', 'awesome',
100])
102_NEGATIVE_WORDS = frozenset([
103 'bad', 'wrong', 'terrible', 'hate', 'awful', 'worse', 'useless',
104 'frustrated', 'confused', 'disappointed', 'annoying',
105])
107# Multi-word phrases extracted for substring matching (cannot be caught
108# by single-word tokenization)
109_MULTI_WORD_PHRASES: Dict[str, List[str]] = {
110 FEATURE_REQUEST: ['could you', 'would be nice'],
111 SUPPORT_NEEDED: ['how do i', 'how to'],
112 RECRUITMENT_LEAD: ['help out', 'open source'],
113 OPEN_SOURCE_SIGNAL: ['fine-tune', 'new model'],
114}
116# Single-word lookup: signal_type -> keywords (excludes multi-word phrases)
117_SINGLE_WORD_MAP: Dict[str, frozenset] = {
118 COMPUTE_INTEREST: _COMPUTE_KEYWORDS,
119 MODEL_REQUEST: _MODEL_KEYWORDS,
120 BUG_REPORT: _BUG_KEYWORDS,
121 FEATURE_REQUEST: frozenset(k for k in _FEATURE_KEYWORDS if ' ' not in k),
122 SUPPORT_NEEDED: frozenset(k for k in _SUPPORT_KEYWORDS if ' ' not in k),
123 RECRUITMENT_LEAD: frozenset(k for k in _RECRUIT_KEYWORDS if ' ' not in k),
124 OPEN_SOURCE_SIGNAL: frozenset(k for k in _OPENSOURCE_KEYWORDS if ' ' not in k),
125}
128# =====================================================================
129# Signal Feed Entry
130# =====================================================================
132def _make_feed_entry(message, signals: List[str]) -> dict:
133 """Create a lightweight feed entry from a message and its signals."""
134 content = ''
135 try:
136 content = message.content if hasattr(message, 'content') else ''
137 if callable(content):
138 content = content()
139 except Exception:
140 content = getattr(message, 'text', '') or ''
142 return {
143 'message_id': getattr(message, 'id', ''),
144 'channel': getattr(message, 'channel', ''),
145 'sender_id': getattr(message, 'sender_id', ''),
146 'sender_name': getattr(message, 'sender_name', ''),
147 'text_preview': (content[:120] + '...') if len(content) > 120 else content,
148 'signals': list(signals),
149 'is_group': getattr(message, 'is_group', False),
150 'timestamp': time.time(),
151 }
154# =====================================================================
155# HiveSignalBridge
156# =====================================================================
158class HiveSignalBridge:
159 """Captures signals from all channel adapters and feeds them into the hive.
161 Every message across every channel is a signal -- for recruitment,
162 support, demand detection, sentiment, and hive growth.
164 The _on_message handler is designed to be fast (<5ms): no LLM calls,
165 no database writes, no network I/O. All heavy work (goal dispatch,
166 instruction queuing, etc.) is offloaded to background threads.
167 """
169 def __init__(self):
170 self._lock = threading.Lock()
172 # Stats: signal counts by type and by channel
173 self._signal_counts: Dict[str, int] = {st: 0 for st in ALL_SIGNAL_TYPES}
174 self._channel_counts: Dict[str, int] = {}
175 self._total_messages: int = 0
177 # Bounded signal feed for dashboard display
178 self._signal_feed: collections.deque = collections.deque(maxlen=1000)
180 # Background executor for routing (keeps _on_message fast)
181 self._executor: Optional[Any] = None
182 self._executor_lock = threading.Lock()
184 # Attached adapter names (for diagnostics)
185 self._attached_adapters: List[str] = []
187 # ── Lazy background executor ───────────────────────────────────
189 def _get_executor(self):
190 """Lazily create the thread pool executor for routing."""
191 if self._executor is None:
192 with self._executor_lock:
193 if self._executor is None:
194 from concurrent.futures import ThreadPoolExecutor
195 self._executor = ThreadPoolExecutor(
196 max_workers=2,
197 thread_name_prefix='hive_signal',
198 )
199 return self._executor
201 # ── Adapter Attachment ─────────────────────────────────────────
203 def attach_to_adapter(self, adapter) -> None:
204 """Register self as a message handler on a channel adapter.
206 Call this for every ChannelAdapter at boot time.
208 Args:
209 adapter: A ChannelAdapter instance (from integrations.channels.base).
210 """
211 try:
212 adapter.on_message(self._on_message)
213 name = getattr(adapter, 'name', str(type(adapter).__name__))
214 self._attached_adapters.append(name)
215 logger.info("HiveSignalBridge attached to channel: %s", name)
216 except Exception as e:
217 logger.warning("Failed to attach HiveSignalBridge to adapter: %s", e)
219 def attach_to_all(self, adapters: dict) -> None:
220 """Attach to all adapters in a dict (e.g., ChannelRegistry._adapters).
222 Args:
223 adapters: Dict mapping channel name -> ChannelAdapter instance.
224 """
225 for name, adapter in adapters.items():
226 self.attach_to_adapter(adapter)
228 # ── Core Message Handler ───────────────────────────────────────
230 def _on_message(self, message) -> None:
231 """Core handler. Called for every message on every channel.
233 MUST be fast (<5ms). No LLM calls, no DB writes, no network I/O.
234 Classification is pure heuristic. Routing is offloaded to background.
235 """
236 try:
237 # Extract text content (Message.content is a property in base.py)
238 text = ''
239 try:
240 text = message.content if hasattr(message, 'content') else ''
241 if callable(text):
242 text = text()
243 except Exception:
244 text = getattr(message, 'text', '') or ''
246 if not text or len(text.strip()) < 2:
247 return
249 channel_type = getattr(message, 'channel', '')
250 is_group = getattr(message, 'is_group', False)
252 # Fast heuristic classification (no LLM)
253 signals = self.classify_signal(text, channel_type, is_group)
255 if not signals:
256 return
258 # Update stats (thread-safe)
259 with self._lock:
260 self._total_messages += 1
261 for sig in signals:
262 self._signal_counts[sig] = self._signal_counts.get(sig, 0) + 1
263 self._channel_counts[channel_type] = (
264 self._channel_counts.get(channel_type, 0) + 1
265 )
267 # Add to feed
268 entry = _make_feed_entry(message, signals)
269 self._signal_feed.append(entry)
271 # Emit to EventBus (best-effort, non-blocking)
272 self._emit_signal_event(message, signals, channel_type)
274 # Emit micro-Spark reward event for the originating channel
275 self._emit_spark_event(message, signals, channel_type)
277 # Route to appropriate hive agents (background thread)
278 self._get_executor().submit(
279 self._route_signals, message, signals
280 )
282 except Exception as e:
283 # Handler must never raise -- would break the channel adapter
284 logger.debug("HiveSignalBridge._on_message error: %s", e)
286 # ── Signal Classification ──────────────────────────────────────
288 def classify_signal(self, text: str, channel_type: str = '',
289 is_group: bool = False) -> List[str]:
290 """Fast heuristic classifier. No LLM calls -- pure keyword matching.
292 A single message can match multiple signal types.
294 Args:
295 text: Message text content.
296 channel_type: Channel name (e.g., 'discord', 'telegram').
297 is_group: Whether the message is from a group chat.
299 Returns:
300 List of matched signal type strings.
301 """
302 signals: List[str] = []
303 text_lower = text.lower()
304 words = set(text_lower.split())
306 # Single-word keyword matching
307 for signal_type, keywords in _SINGLE_WORD_MAP.items():
308 if words & keywords:
309 signals.append(signal_type)
311 # Multi-word phrase matching (substring search)
312 for signal_type, phrases in _MULTI_WORD_PHRASES.items():
313 if signal_type not in signals:
314 for phrase in phrases:
315 if phrase in text_lower:
316 signals.append(signal_type)
317 break
319 # URL-based signals
320 if 'huggingface.co' in text_lower or 'arxiv.org' in text_lower:
321 if OPEN_SOURCE_SIGNAL not in signals:
322 signals.append(OPEN_SOURCE_SIGNAL)
324 # Sentiment detection (always check -- provides community health data)
325 pos_count = len(words & _POSITIVE_WORDS)
326 neg_count = len(words & _NEGATIVE_WORDS)
327 if pos_count > 0 or neg_count > 0:
328 if SENTIMENT not in signals:
329 signals.append(SENTIMENT)
331 return signals
333 # ── Event Emission ─────────────────────────────────────────────
335 def _emit_signal_event(self, message, signals: List[str],
336 channel_type: str) -> None:
337 """Emit 'hive.signal.received' to EventBus."""
338 try:
339 from core.platform.events import emit_event
340 emit_event('hive.signal.received', {
341 'message_id': getattr(message, 'id', ''),
342 'channel': channel_type,
343 'sender_id': getattr(message, 'sender_id', ''),
344 'signals': signals,
345 'is_group': getattr(message, 'is_group', False),
346 'timestamp': time.time(),
347 })
348 except Exception:
349 pass # EventBus emission is best-effort
351 def _emit_spark_event(self, message, signals: List[str],
352 channel_type: str) -> None:
353 """Emit 'hive.signal.spark' for micro-Spark channel reward.
355 Actual Spark accounting happens elsewhere (revenue_aggregator).
356 This event simply declares that a signal-worthy message occurred
357 on a given channel, so the reward system can credit it.
358 """
359 try:
360 from core.platform.events import emit_event
361 emit_event('hive.signal.spark', {
362 'channel': channel_type,
363 'sender_id': getattr(message, 'sender_id', ''),
364 'signal_count': len(signals),
365 'signals': signals,
366 'timestamp': time.time(),
367 })
368 except Exception:
369 pass
371 # ── Signal Routing ─────────────────────────────────────────────
373 def _route_signals(self, message, signals: List[str]) -> None:
374 """Route classified signals to the appropriate hive subsystem.
376 Runs in a background thread so the message handler stays fast.
377 Each router method is best-effort -- failures are logged, never raised.
378 """
379 router_map = {
380 COMPUTE_INTEREST: self._route_compute_interest,
381 MODEL_REQUEST: self._route_model_request,
382 BUG_REPORT: self._route_bug_report,
383 FEATURE_REQUEST: self._route_feature_request,
384 SUPPORT_NEEDED: self._route_support_needed,
385 RECRUITMENT_LEAD: self._route_recruitment_lead,
386 OPEN_SOURCE_SIGNAL: self._route_open_source_signal,
387 SENTIMENT: self._route_sentiment,
388 }
389 for signal_type in signals:
390 handler = router_map.get(signal_type)
391 if handler:
392 try:
393 handler(message, signals)
394 except Exception as e:
395 logger.debug("Signal routing error (%s): %s", signal_type, e)
397 def _route_compute_interest(self, message, signals: List[str]) -> None:
398 """Queue recruitment task for compute_recruiter agent.
400 Dispatches a goal that triggers the bootstrap_compute_recruiter
401 seed (from goal_seeding.py) to reach out with personalized
402 onboarding content.
403 """
404 try:
405 from integrations.agent_engine.dispatch import dispatch_goal
406 text = _extract_text(message)
407 sender = getattr(message, 'sender_name', '') or getattr(message, 'sender_id', 'unknown')
408 channel = getattr(message, 'channel', 'unknown')
409 dispatch_goal(
410 prompt=(
411 f"Compute recruitment lead detected on {channel} from {sender}: "
412 f"\"{text[:300]}\". "
413 "Evaluate interest level and prepare personalized onboarding message. "
414 "Explain how to contribute idle compute to the hive and earn Spark."
415 ),
416 user_id='hive_signal_bridge',
417 goal_id=f"sig_compute_{getattr(message, 'id', '')}",
418 goal_type='hive_growth',
419 )
420 logger.debug("Routed COMPUTE_INTEREST signal from %s", channel)
421 except ImportError:
422 logger.debug("dispatch module not available for compute routing")
423 except Exception as e:
424 logger.debug("COMPUTE_INTEREST routing failed: %s", e)
426 def _route_model_request(self, message, signals: List[str]) -> None:
427 """Check if requested model is available; if not, queue provisioning.
429 Routes to model_provisioner goal to check hive model registry
430 and potentially onboard the requested model.
431 """
432 try:
433 from integrations.agent_engine.dispatch import dispatch_goal
434 text = _extract_text(message)
435 channel = getattr(message, 'channel', 'unknown')
436 dispatch_goal(
437 prompt=(
438 f"Model request detected on {channel}: \"{text[:300]}\". "
439 "Check if the requested model is available in the hive model registry. "
440 "If not, evaluate feasibility and queue for onboarding."
441 ),
442 user_id='hive_signal_bridge',
443 goal_id=f"sig_model_{getattr(message, 'id', '')}",
444 goal_type='hive_growth',
445 )
446 logger.debug("Routed MODEL_REQUEST signal from %s", channel)
447 except ImportError:
448 logger.debug("dispatch module not available for model routing")
449 except Exception as e:
450 logger.debug("MODEL_REQUEST routing failed: %s", e)
452 def _route_bug_report(self, message, signals: List[str]) -> None:
453 """Create HiveTask (BUG_FIX type) for connected Claude Code sessions.
455 If the hive_task_protocol is available, create a task. Otherwise
456 falls back to instruction_queue for the next idle agent.
457 """
458 text = _extract_text(message)
459 channel = getattr(message, 'channel', 'unknown')
460 sender = getattr(message, 'sender_name', '') or getattr(message, 'sender_id', 'unknown')
462 # Try HiveTaskDispatcher first (for live Claude Code sessions)
463 try:
464 from integrations.coding_agent.hive_task_protocol import get_dispatcher
465 get_dispatcher().create_task(
466 task_type='bug_fix',
467 title=f"Bug report from {channel}",
468 description=f"Bug report from {sender} on {channel}: {text[:500]}",
469 instructions=f"Investigate and fix: {text[:1000]}",
470 )
471 logger.debug("Routed BUG_REPORT to HiveTaskDispatcher from %s", channel)
472 return
473 except (ImportError, AttributeError):
474 pass
475 except Exception as e:
476 logger.debug("HiveTask dispatch failed, falling back: %s", e)
478 # Fallback: queue in instruction_queue
479 try:
480 from integrations.agent_engine.instruction_queue import enqueue_instruction
481 enqueue_instruction(
482 user_id='hive_signal_bridge',
483 text=(
484 f"Bug report from {sender} on {channel}: {text[:500]}. "
485 "Investigate and fix if possible."
486 ),
487 priority=7,
488 tags=['bug', 'signal_bridge', channel],
489 )
490 logger.debug("Routed BUG_REPORT to instruction_queue from %s", channel)
491 except ImportError:
492 logger.debug("instruction_queue not available for bug routing")
493 except Exception as e:
494 logger.debug("BUG_REPORT instruction queue failed: %s", e)
496 def _route_feature_request(self, message, signals: List[str]) -> None:
497 """Queue feature request in instruction_queue for next idle agent."""
498 try:
499 from integrations.agent_engine.instruction_queue import enqueue_instruction
500 text = _extract_text(message)
501 channel = getattr(message, 'channel', 'unknown')
502 sender = getattr(message, 'sender_name', '') or getattr(message, 'sender_id', 'unknown')
503 enqueue_instruction(
504 user_id='hive_signal_bridge',
505 text=(
506 f"Feature request from {sender} on {channel}: {text[:500]}. "
507 "Evaluate feasibility and create implementation plan if viable."
508 ),
509 priority=4,
510 tags=['feature_request', 'signal_bridge', channel],
511 )
512 logger.debug("Routed FEATURE_REQUEST to instruction_queue from %s", channel)
513 except ImportError:
514 logger.debug("instruction_queue not available for feature routing")
515 except Exception as e:
516 logger.debug("FEATURE_REQUEST routing failed: %s", e)
518 def _route_support_needed(self, message, signals: List[str]) -> None:
519 """Dispatch to /chat for immediate response.
521 Uses dispatch_goal with high priority so the user gets help fast.
522 """
523 try:
524 from integrations.agent_engine.dispatch import dispatch_goal
525 text = _extract_text(message)
526 channel = getattr(message, 'channel', 'unknown')
527 sender = getattr(message, 'sender_name', '') or getattr(message, 'sender_id', 'unknown')
528 dispatch_goal(
529 prompt=(
530 f"Support request from {sender} on {channel}: \"{text[:300]}\". "
531 "Provide helpful, clear guidance. Be patient and thorough."
532 ),
533 user_id=getattr(message, 'sender_id', 'hive_signal_bridge'),
534 goal_id=f"sig_support_{getattr(message, 'id', '')}",
535 goal_type='support',
536 )
537 logger.debug("Routed SUPPORT_NEEDED to /chat from %s", channel)
538 except ImportError:
539 logger.debug("dispatch module not available for support routing")
540 except Exception as e:
541 logger.debug("SUPPORT_NEEDED routing failed: %s", e)
543 def _route_recruitment_lead(self, message, signals: List[str]) -> None:
544 """High-priority: queue personalized onboarding message.
546 Someone expressed interest in contributing -- this is the most
547 valuable signal. Route to compute_recruiter with high priority.
548 """
549 try:
550 from integrations.agent_engine.dispatch import dispatch_goal
551 text = _extract_text(message)
552 sender = getattr(message, 'sender_name', '') or getattr(message, 'sender_id', 'unknown')
553 channel = getattr(message, 'channel', 'unknown')
554 dispatch_goal(
555 prompt=(
556 f"HIGH PRIORITY recruitment lead on {channel} from {sender}: "
557 f"\"{text[:300]}\". "
558 "This person wants to contribute to the hive. "
559 "Prepare a warm, personalized onboarding message. "
560 "Explain the mission, how to get started, and the Spark rewards. "
561 "Make them feel welcome and valued."
562 ),
563 user_id='hive_signal_bridge',
564 goal_id=f"sig_recruit_{getattr(message, 'id', '')}",
565 goal_type='hive_growth',
566 )
567 logger.debug("Routed RECRUITMENT_LEAD from %s", channel)
568 except ImportError:
569 logger.debug("dispatch module not available for recruitment routing")
570 except Exception as e:
571 logger.debug("RECRUITMENT_LEAD routing failed: %s", e)
573 def _route_open_source_signal(self, message, signals: List[str]) -> None:
574 """Queue model onboarding task for opensource_evangelist agent."""
575 try:
576 from integrations.agent_engine.dispatch import dispatch_goal
577 text = _extract_text(message)
578 channel = getattr(message, 'channel', 'unknown')
579 dispatch_goal(
580 prompt=(
581 f"Open source signal detected on {channel}: \"{text[:300]}\". "
582 "Evaluate if this is a new model, paper, or benchmark worth "
583 "onboarding to the hive. If so, create an integration plan."
584 ),
585 user_id='hive_signal_bridge',
586 goal_id=f"sig_oss_{getattr(message, 'id', '')}",
587 goal_type='hive_growth',
588 )
589 logger.debug("Routed OPEN_SOURCE_SIGNAL from %s", channel)
590 except ImportError:
591 logger.debug("dispatch module not available for OSS routing")
592 except Exception as e:
593 logger.debug("OPEN_SOURCE_SIGNAL routing failed: %s", e)
595 def _route_sentiment(self, message, signals: List[str]) -> None:
596 """Feed sentiment signal to resonance tuner for community health.
598 Uses SignalExtractor to compute sentiment and feeds it as a
599 lightweight community-level signal. No per-user profile update
600 (that happens through the normal /chat path).
601 """
602 try:
603 from core.resonance_tuner import SignalExtractor
604 text = _extract_text(message)
605 extracted = SignalExtractor.extract(text, '', 0.0)
606 channel = getattr(message, 'channel', 'unknown')
608 try:
609 from core.platform.events import emit_event
610 emit_event('hive.signal.sentiment', {
611 'channel': channel,
612 'sender_id': getattr(message, 'sender_id', ''),
613 'positive_sentiment': extracted.positive_sentiment,
614 'formality': extracted.formality_markers,
615 'is_group': getattr(message, 'is_group', False),
616 'timestamp': time.time(),
617 })
618 except Exception:
619 pass
620 except ImportError:
621 pass
622 except Exception as e:
623 logger.debug("SENTIMENT routing failed: %s", e)
625 # ── Stats & Feed ───────────────────────────────────────────────
627 def get_stats(self) -> dict:
628 """Signal counts by type, by channel, and total messages processed.
630 Returns:
631 Dict with 'by_type', 'by_channel', 'total_messages',
632 and 'attached_adapters'.
633 """
634 with self._lock:
635 return {
636 'by_type': dict(self._signal_counts),
637 'by_channel': dict(self._channel_counts),
638 'total_messages': self._total_messages,
639 'attached_adapters': list(self._attached_adapters),
640 }
642 def get_signal_feed(self, limit: int = 50) -> List[dict]:
643 """Recent signals for dashboard display.
645 Args:
646 limit: Maximum number of entries to return (default 50).
648 Returns:
649 List of signal feed entries, most recent first.
650 """
651 limit = max(1, min(limit, 1000))
652 entries = list(self._signal_feed)
653 # Return most recent first
654 return list(reversed(entries[-limit:]))
657# =====================================================================
658# Helpers
659# =====================================================================
661def _extract_text(message) -> str:
662 """Safely extract text content from a Message object."""
663 try:
664 content = message.content if hasattr(message, 'content') else ''
665 if callable(content):
666 content = content()
667 return content or getattr(message, 'text', '') or ''
668 except Exception:
669 return getattr(message, 'text', '') or ''
672# =====================================================================
673# Singleton
674# =====================================================================
676_bridge: Optional[HiveSignalBridge] = None
677_bridge_lock = threading.Lock()
680def get_signal_bridge() -> HiveSignalBridge:
681 """Get or create the singleton HiveSignalBridge."""
682 global _bridge
683 if _bridge is None:
684 with _bridge_lock:
685 if _bridge is None:
686 _bridge = HiveSignalBridge()
687 return _bridge
690# =====================================================================
691# Flask Blueprint (lazy)
692# =====================================================================
694def create_signal_blueprint():
695 """Create a Flask Blueprint for signal bridge API endpoints.
697 Endpoints:
698 GET /api/hive/signals/stats - Signal counts by type and channel
699 GET /api/hive/signals/feed - Recent signals (limit query param)
700 POST /api/hive/signals/classify - Test classifier on arbitrary text
702 Returns:
703 Flask Blueprint instance.
704 """
705 try:
706 from flask import Blueprint, jsonify, request
707 except ImportError:
708 logger.debug("Flask not available -- signal blueprint not created")
709 return None
711 bp = Blueprint('hive_signals', __name__, url_prefix='/api/hive/signals')
713 @bp.route('/stats', methods=['GET'])
714 def signal_stats():
715 bridge = get_signal_bridge()
716 return jsonify(bridge.get_stats())
718 @bp.route('/feed', methods=['GET'])
719 def signal_feed():
720 bridge = get_signal_bridge()
721 limit = request.args.get('limit', 50, type=int)
722 return jsonify(bridge.get_signal_feed(limit=limit))
724 @bp.route('/classify', methods=['POST'])
725 def classify_text():
726 bridge = get_signal_bridge()
727 data = request.get_json(silent=True) or {}
728 text = data.get('text', '')
729 channel_type = data.get('channel_type', '')
730 is_group = data.get('is_group', False)
731 if not text:
732 return jsonify({'error': 'text is required'}), 400
733 signals = bridge.classify_signal(text, channel_type, is_group)
734 return jsonify({
735 'text': text,
736 'signals': signals,
737 'channel_type': channel_type,
738 'is_group': is_group,
739 })
741 return bp