Coverage for integrations / channels / bridge / wamp_bridge.py: 0.0%
268 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2WAMP Bridge for Channel Chaining
4Integrates HevolveBot channel adapters with Crossbar WAMP for:
5- Cross-channel message forwarding
6- Unified inbox across channels
7- Channel-to-channel relay rules
8- Real-time message routing via pub/sub
10Uses existing Crossbar infrastructure:
11- com.hertzai.hevolve.channel.* topics for channel events
12- com.hertzai.hevolve.bridge.* topics for bridge control
13"""
15from __future__ import annotations
17import asyncio
18import json
19import logging
20import os
21from dataclasses import dataclass, field
22from datetime import datetime
23from enum import Enum
24from typing import Any, Callable, Dict, List, Optional, Set
25from functools import partial
27try:
28 from autobahn.asyncio.component import Component
29 from autobahn.wamp.exception import ApplicationError
30 HAS_AUTOBAHN = True
31except ImportError:
32 HAS_AUTOBAHN = False
34from ..base import Message, SendResult
37def _get_crossbar_port() -> int:
38 try:
39 from core.port_registry import get_port
40 return get_port('crossbar')
41 except ImportError:
42 return 8088
43from ..registry import ChannelRegistry
45logger = logging.getLogger(__name__)
48class RouteType(Enum):
49 """Types of message routing."""
50 FORWARD = "forward" # Forward message to another channel
51 MIRROR = "mirror" # Mirror to multiple channels
52 BROADCAST = "broadcast" # Broadcast to all channels
53 FILTER = "filter" # Forward only matching messages
54 TRANSFORM = "transform" # Transform before forwarding
57@dataclass
58class BridgeRule:
59 """Rule for routing messages between channels."""
60 id: str
61 name: str
62 source_channel: str # Source channel type (telegram, discord, etc.)
63 source_chat_id: Optional[str] = None # Specific chat or None for all
64 target_channel: str = "" # Target channel type
65 target_chat_id: Optional[str] = None # Target chat ID
66 route_type: RouteType = RouteType.FORWARD
67 enabled: bool = True
69 # Filtering options
70 filter_pattern: Optional[str] = None # Regex pattern to match
71 filter_sender: Optional[str] = None # Specific sender ID
72 filter_keywords: List[str] = field(default_factory=list)
74 # Transform options
75 prefix: str = "" # Prefix to add to forwarded messages
76 suffix: str = "" # Suffix to add
77 include_source_info: bool = True # Include [From: channel/user] header
78 strip_mentions: bool = False # Remove @mentions before forwarding
80 # Rate limiting
81 rate_limit: int = 0 # Max messages per minute (0 = unlimited)
82 cooldown_seconds: int = 0 # Cooldown between forwards
84 # Metadata
85 created_at: datetime = field(default_factory=datetime.now)
86 last_triggered: Optional[datetime] = None
87 trigger_count: int = 0
90@dataclass
91class BridgeConfig:
92 """Configuration for channel bridge."""
93 # WAMP connection
94 crossbar_url: str = ""
95 realm: str = "realm1"
97 # Topic prefixes
98 channel_topic_prefix: str = "com.hertzai.hevolve.channel"
99 bridge_topic_prefix: str = "com.hertzai.hevolve.bridge"
101 # Behavior
102 enable_broadcast: bool = True
103 enable_unified_inbox: bool = True
104 max_forward_chain: int = 3 # Prevent infinite loops
105 forward_timeout: float = 10.0 # Timeout for forward operations
107 # Persistence
108 rules_file: str = "/app/data/bridge_rules.json"
110 @classmethod
111 def from_env(cls) -> "BridgeConfig":
112 """Create config from environment variables."""
113 return cls(
114 crossbar_url=os.getenv("CBURL", f"ws://localhost:{_get_crossbar_port()}/ws"),
115 realm=os.getenv("CBREALM", "realm1"),
116 rules_file=os.getenv("BRIDGE_RULES_FILE", "/app/data/bridge_rules.json"),
117 )
120class ChannelBridge:
121 """
122 Bridge for routing messages between channels via WAMP Crossbar.
124 Enables:
125 - Forward messages from one channel to another
126 - Mirror messages to multiple channels
127 - Broadcast to all channels
128 - Filter-based routing
129 - Transform messages during forwarding
131 Example:
132 bridge = ChannelBridge(config, registry)
133 await bridge.connect()
135 # Add a rule to forward Telegram messages to Discord
136 bridge.add_rule(BridgeRule(
137 id="tg-to-discord",
138 name="Telegram to Discord",
139 source_channel="telegram",
140 target_channel="discord",
141 target_chat_id="123456789",
142 include_source_info=True,
143 ))
144 """
146 def __init__(
147 self,
148 config: BridgeConfig,
149 registry: ChannelRegistry,
150 ):
151 self.config = config
152 self.registry = registry
153 self._rules: Dict[str, BridgeRule] = {}
154 self._component: Optional[Component] = None
155 self._session = None
156 self._connected = False
157 self._forward_chain: Dict[str, int] = {} # Track forward depth
158 self._rate_limiters: Dict[str, List[datetime]] = {}
159 self._handlers: Dict[str, Callable] = {}
161 # Load persisted rules
162 self._load_rules()
164 def _load_rules(self) -> None:
165 """Load rules from persistence file."""
166 try:
167 if os.path.exists(self.config.rules_file):
168 with open(self.config.rules_file, "r") as f:
169 data = json.load(f)
170 for rule_data in data.get("rules", []):
171 rule = BridgeRule(
172 id=rule_data["id"],
173 name=rule_data["name"],
174 source_channel=rule_data["source_channel"],
175 source_chat_id=rule_data.get("source_chat_id"),
176 target_channel=rule_data["target_channel"],
177 target_chat_id=rule_data.get("target_chat_id"),
178 route_type=RouteType(rule_data.get("route_type", "forward")),
179 enabled=rule_data.get("enabled", True),
180 prefix=rule_data.get("prefix", ""),
181 suffix=rule_data.get("suffix", ""),
182 include_source_info=rule_data.get("include_source_info", True),
183 )
184 self._rules[rule.id] = rule
185 logger.info(f"Loaded {len(self._rules)} bridge rules")
186 except Exception as e:
187 logger.warning(f"Could not load bridge rules: {e}")
189 def _save_rules(self) -> None:
190 """Save rules to persistence file."""
191 try:
192 os.makedirs(os.path.dirname(self.config.rules_file), exist_ok=True)
193 data = {
194 "rules": [
195 {
196 "id": r.id,
197 "name": r.name,
198 "source_channel": r.source_channel,
199 "source_chat_id": r.source_chat_id,
200 "target_channel": r.target_channel,
201 "target_chat_id": r.target_chat_id,
202 "route_type": r.route_type.value,
203 "enabled": r.enabled,
204 "prefix": r.prefix,
205 "suffix": r.suffix,
206 "include_source_info": r.include_source_info,
207 }
208 for r in self._rules.values()
209 ]
210 }
211 with open(self.config.rules_file, "w") as f:
212 json.dump(data, f, indent=2)
213 except Exception as e:
214 logger.error(f"Could not save bridge rules: {e}")
216 async def connect(self) -> bool:
217 """Connect to Crossbar WAMP router."""
218 if not HAS_AUTOBAHN:
219 logger.error("autobahn not installed, cannot connect to Crossbar")
220 return False
222 try:
223 self._component = Component(
224 transports=self.config.crossbar_url,
225 realm=self.config.realm,
226 )
228 @self._component.on_join
229 async def on_join(session, details):
230 self._session = session
231 self._connected = True
232 logger.info("Channel bridge connected to Crossbar")
234 # Subscribe to channel events
235 await self._setup_subscriptions()
237 # Register RPC methods
238 await self._register_rpcs()
240 @self._component.on_leave
241 async def on_leave(session, details):
242 self._connected = False
243 logger.info("Channel bridge disconnected from Crossbar")
245 # Start component in background
246 asyncio.create_task(self._run_component())
248 # Wait for connection
249 for _ in range(50): # 5 seconds timeout
250 if self._connected:
251 return True
252 await asyncio.sleep(0.1)
254 return False
256 except Exception as e:
257 logger.error(f"Failed to connect to Crossbar: {e}")
258 return False
260 async def _run_component(self) -> None:
261 """Run the WAMP component."""
262 try:
263 from autobahn.asyncio.component import run
264 await self._component.start()
265 except Exception as e:
266 logger.error(f"Component error: {e}")
268 async def _setup_subscriptions(self) -> None:
269 """Subscribe to channel message topics."""
270 # Subscribe to all channel message events
271 topic = f"{self.config.channel_topic_prefix}.message"
272 await self._session.subscribe(self._on_channel_message, topic)
273 logger.info(f"Subscribed to {topic}")
275 # Subscribe to bridge control topic
276 control_topic = f"{self.config.bridge_topic_prefix}.control"
277 await self._session.subscribe(self._on_bridge_control, control_topic)
278 logger.info(f"Subscribed to {control_topic}")
280 async def _register_rpcs(self) -> None:
281 """Register RPC methods for bridge control."""
282 prefix = self.config.bridge_topic_prefix
284 await self._session.register(
285 self.add_rule_rpc,
286 f"{prefix}.add_rule"
287 )
288 await self._session.register(
289 self.remove_rule_rpc,
290 f"{prefix}.remove_rule"
291 )
292 await self._session.register(
293 self.list_rules_rpc,
294 f"{prefix}.list_rules"
295 )
296 await self._session.register(
297 self.forward_message_rpc,
298 f"{prefix}.forward"
299 )
301 logger.info("Registered bridge RPC methods")
303 async def _on_channel_message(self, message_data: Dict[str, Any]) -> None:
304 """Handle incoming channel message event."""
305 try:
306 # Parse message
307 channel = message_data.get("channel", "")
308 chat_id = message_data.get("chat_id", "")
309 message_id = message_data.get("message_id", "")
311 # Check forward chain depth
312 chain_key = f"{channel}:{chat_id}:{message_id}"
313 depth = self._forward_chain.get(chain_key, 0)
315 if depth >= self.config.max_forward_chain:
316 logger.warning(f"Max forward chain reached for {chain_key}")
317 return
319 # Find matching rules
320 for rule in self._rules.values():
321 if not rule.enabled:
322 continue
324 if rule.source_channel != channel:
325 continue
327 if rule.source_chat_id and rule.source_chat_id != chat_id:
328 continue
330 # Check rate limit
331 if not self._check_rate_limit(rule):
332 continue
334 # Execute forward
335 self._forward_chain[chain_key] = depth + 1
336 try:
337 await self._execute_forward(rule, message_data)
338 finally:
339 del self._forward_chain[chain_key]
341 except Exception as e:
342 logger.error(f"Error handling channel message: {e}")
344 async def _on_bridge_control(self, control_data: Dict[str, Any]) -> None:
345 """Handle bridge control messages."""
346 action = control_data.get("action", "")
348 if action == "reload_rules":
349 self._load_rules()
350 elif action == "clear_rules":
351 self._rules.clear()
352 self._save_rules()
354 def _check_rate_limit(self, rule: BridgeRule) -> bool:
355 """Check if rule is within rate limit."""
356 if rule.rate_limit <= 0:
357 return True
359 now = datetime.now()
360 key = rule.id
362 if key not in self._rate_limiters:
363 self._rate_limiters[key] = []
365 # Clean old entries
366 window_start = now.timestamp() - 60
367 self._rate_limiters[key] = [
368 t for t in self._rate_limiters[key]
369 if t.timestamp() > window_start
370 ]
372 # Check limit
373 if len(self._rate_limiters[key]) >= rule.rate_limit:
374 return False
376 self._rate_limiters[key].append(now)
377 return True
379 async def _execute_forward(
380 self,
381 rule: BridgeRule,
382 message_data: Dict[str, Any]
383 ) -> Optional[SendResult]:
384 """Execute a forward operation based on rule."""
385 try:
386 # Get target adapter
387 target_adapter = self.registry.get(rule.target_channel)
388 if not target_adapter:
389 logger.warning(f"Target channel {rule.target_channel} not found")
390 return None
392 # Build forwarded message
393 text = message_data.get("text", "")
395 # Apply transforms
396 if rule.include_source_info:
397 source_channel = message_data.get("channel", "unknown")
398 sender_name = message_data.get("sender_name", "unknown")
399 text = f"[From {source_channel}/{sender_name}]\n{text}"
401 if rule.prefix:
402 text = f"{rule.prefix}{text}"
403 if rule.suffix:
404 text = f"{text}{rule.suffix}"
406 # Determine target chat
407 target_chat = rule.target_chat_id
408 if not target_chat:
409 # Use same chat ID if not specified
410 target_chat = message_data.get("chat_id", "")
412 # Send to target channel
413 result = await target_adapter.send_message(
414 chat_id=target_chat,
415 text=text,
416 )
418 # Update rule stats
419 rule.last_triggered = datetime.now()
420 rule.trigger_count += 1
422 # Publish forward event
423 if self._session:
424 await self._session.publish(
425 f"{self.config.bridge_topic_prefix}.forwarded",
426 {
427 "rule_id": rule.id,
428 "source_channel": rule.source_channel,
429 "target_channel": rule.target_channel,
430 "success": result.success if result else False,
431 }
432 )
434 return result
436 except Exception as e:
437 logger.error(f"Forward failed for rule {rule.id}: {e}")
438 return None
440 # Rule Management
442 def add_rule(self, rule: BridgeRule) -> None:
443 """Add a bridge rule."""
444 self._rules[rule.id] = rule
445 self._save_rules()
446 logger.info(f"Added bridge rule: {rule.name}")
448 def remove_rule(self, rule_id: str) -> bool:
449 """Remove a bridge rule."""
450 if rule_id in self._rules:
451 del self._rules[rule_id]
452 self._save_rules()
453 logger.info(f"Removed bridge rule: {rule_id}")
454 return True
455 return False
457 def get_rule(self, rule_id: str) -> Optional[BridgeRule]:
458 """Get a rule by ID."""
459 return self._rules.get(rule_id)
461 def list_rules(self) -> List[BridgeRule]:
462 """List all rules."""
463 return list(self._rules.values())
465 def enable_rule(self, rule_id: str) -> bool:
466 """Enable a rule."""
467 if rule_id in self._rules:
468 self._rules[rule_id].enabled = True
469 self._save_rules()
470 return True
471 return False
473 def disable_rule(self, rule_id: str) -> bool:
474 """Disable a rule."""
475 if rule_id in self._rules:
476 self._rules[rule_id].enabled = False
477 self._save_rules()
478 return True
479 return False
481 # RPC Methods
483 async def add_rule_rpc(self, rule_data: Dict[str, Any]) -> Dict[str, Any]:
484 """RPC method to add a rule."""
485 try:
486 rule = BridgeRule(
487 id=rule_data.get("id", str(datetime.now().timestamp())),
488 name=rule_data["name"],
489 source_channel=rule_data["source_channel"],
490 source_chat_id=rule_data.get("source_chat_id"),
491 target_channel=rule_data["target_channel"],
492 target_chat_id=rule_data.get("target_chat_id"),
493 route_type=RouteType(rule_data.get("route_type", "forward")),
494 )
495 self.add_rule(rule)
496 return {"success": True, "rule_id": rule.id}
497 except Exception as e:
498 return {"success": False, "error": str(e)}
500 async def remove_rule_rpc(self, rule_id: str) -> Dict[str, Any]:
501 """RPC method to remove a rule."""
502 success = self.remove_rule(rule_id)
503 return {"success": success}
505 async def list_rules_rpc(self) -> List[Dict[str, Any]]:
506 """RPC method to list rules."""
507 return [
508 {
509 "id": r.id,
510 "name": r.name,
511 "source_channel": r.source_channel,
512 "target_channel": r.target_channel,
513 "enabled": r.enabled,
514 "trigger_count": r.trigger_count,
515 }
516 for r in self._rules.values()
517 ]
519 async def forward_message_rpc(
520 self,
521 source_channel: str,
522 source_chat_id: str,
523 target_channel: str,
524 target_chat_id: str,
525 text: str,
526 ) -> Dict[str, Any]:
527 """RPC method to forward a specific message."""
528 try:
529 target_adapter = self.registry.get(target_channel)
530 if not target_adapter:
531 return {"success": False, "error": f"Channel {target_channel} not found"}
533 result = await target_adapter.send_message(
534 chat_id=target_chat_id,
535 text=f"[Forwarded from {source_channel}]\n{text}",
536 )
538 return {"success": result.success if result else False}
539 except Exception as e:
540 return {"success": False, "error": str(e)}
542 # Convenience methods
544 async def forward_to_all(
545 self,
546 source_channel: str,
547 message: Message,
548 exclude_source: bool = True,
549 ) -> Dict[str, SendResult]:
550 """Forward a message to all connected channels."""
551 results = {}
553 for channel_name in self.registry.list_channels():
554 if exclude_source and channel_name == source_channel:
555 continue
557 adapter = self.registry.get(channel_name)
558 if not adapter:
559 continue
561 try:
562 # Would need target chat IDs from config
563 # This is a simplified version
564 text = f"[Broadcast from {source_channel}]\n{message.text}"
565 # result = await adapter.send_message(chat_id=???, text=text)
566 # results[channel_name] = result
567 except Exception as e:
568 logger.error(f"Broadcast to {channel_name} failed: {e}")
570 return results
572 async def publish_to_wamp(
573 self,
574 channel: str,
575 message: Message,
576 ) -> None:
577 """Publish a channel message to WAMP for other services."""
578 if not self._session:
579 return
581 topic = f"{self.config.channel_topic_prefix}.message"
582 await self._session.publish(topic, {
583 "channel": channel,
584 "chat_id": message.chat_id,
585 "message_id": message.id,
586 "sender_id": message.sender_id,
587 "sender_name": message.sender_name,
588 "text": message.text,
589 "timestamp": message.timestamp.isoformat(),
590 })
592 async def disconnect(self) -> None:
593 """Disconnect from Crossbar."""
594 self._connected = False
595 if self._component:
596 try:
597 await self._component.stop()
598 except Exception:
599 pass
600 self._session = None
601 logger.info("Channel bridge disconnected")
604def create_channel_bridge(
605 registry: ChannelRegistry,
606 crossbar_url: Optional[str] = None,
607 realm: Optional[str] = None,
608) -> ChannelBridge:
609 """Factory function to create a channel bridge.
611 Args:
612 registry: Channel registry with adapters
613 crossbar_url: Crossbar WebSocket URL
614 realm: WAMP realm
616 Returns:
617 Configured ChannelBridge instance
618 """
619 config = BridgeConfig(
620 crossbar_url=crossbar_url or os.getenv("CBURL", "ws://localhost:8088/ws"),
621 realm=realm or os.getenv("CBREALM", "realm1"),
622 )
623 return ChannelBridge(config, registry)